diff --git a/src/main/java/org/apache/hadoop/hbase/HMsg.java b/src/main/java/org/apache/hadoop/hbase/HMsg.java deleted file mode 100644 index c53460f..0000000 --- a/src/main/java/org/apache/hadoop/hbase/HMsg.java +++ /dev/null @@ -1,256 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.Writable; - -/** - * HMsg is used to send messages between master and regionservers. Messages are - * sent as payload on the regionserver-to-master heartbeats. Region assignment - * does not use this mechanism. It goes via zookeeper. - * - *

Most of the time the messages are simple but some messages are accompanied - * by the region affected. HMsg may also carry an optional message. - * - *

TODO: Clean out all messages that go from master to regionserver; by - * design, these are to go via zk from here on out. - */ -public class HMsg implements Writable { - public static final HMsg [] STOP_REGIONSERVER_ARRAY = - new HMsg [] {new HMsg(Type.STOP_REGIONSERVER)}; - public static final HMsg [] EMPTY_HMSG_ARRAY = new HMsg[0]; - - public static enum Type { - /** Master tells region server to stop. - */ - STOP_REGIONSERVER, - - /** - * Region server split the region associated with this message. - */ - REGION_SPLIT, - - /** - * When RegionServer receives this message, it goes into a sleep that only - * an exit will cure. This message is sent by unit tests simulating - * pathological states. - */ - TESTING_BLOCK_REGIONSERVER, - } - - private Type type = null; - private HRegionInfo info = null; - private byte[] message = null; - private HRegionInfo daughterA = null; - private HRegionInfo daughterB = null; - - /** Default constructor. Used during deserialization */ - public HMsg() { - this(null); - } - - /** - * Construct a message with the specified message and empty HRegionInfo - * @param type Message type - */ - public HMsg(final HMsg.Type type) { - this(type, new HRegionInfo(), null); - } - - /** - * Construct a message with the specified message and HRegionInfo - * @param type Message type - * @param hri Region to which message type applies - */ - public HMsg(final HMsg.Type type, final HRegionInfo hri) { - this(type, hri, null); - } - - /** - * Construct a message with the specified message and HRegionInfo - * - * @param type Message type - * @param hri Region to which message type applies. Cannot be - * null. If no info associated, used other Constructor. - * @param msg Optional message (Stringified exception, etc.) - */ - public HMsg(final HMsg.Type type, final HRegionInfo hri, final byte[] msg) { - this(type, hri, null, null, msg); - } - - /** - * Construct a message with the specified message and HRegionInfo - * - * @param type Message type - * @param hri Region to which message type applies. Cannot be - * null. If no info associated, used other Constructor. - * @param daughterA - * @param daughterB - * @param msg Optional message (Stringified exception, etc.) - */ - public HMsg(final HMsg.Type type, final HRegionInfo hri, - final HRegionInfo daughterA, final HRegionInfo daughterB, final byte[] msg) { - this.type = type; - if (hri == null) { - throw new NullPointerException("Region cannot be null"); - } - this.info = hri; - this.message = msg; - this.daughterA = daughterA; - this.daughterB = daughterB; - } - - /** - * @return Region info or null if none associated with this message type. - */ - public HRegionInfo getRegionInfo() { - return this.info; - } - - /** @return the type of message */ - public Type getType() { - return this.type; - } - - /** - * @param other Message type to compare to - * @return True if we are of same message type as other - */ - public boolean isType(final HMsg.Type other) { - return this.type.equals(other); - } - - /** @return the message type */ - public byte[] getMessage() { - return this.message; - } - - /** - * @return First daughter if Type is MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS else - * null - */ - public HRegionInfo getDaughterA() { - return this.daughterA; - } - - /** - * @return Second daughter if Type is MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS else - * null - */ - public HRegionInfo getDaughterB() { - return this.daughterB; - } - - /** - * @see java.lang.Object#toString() - */ - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append(this.type.toString()); - // If null or empty region, don't bother printing it out. - if (this.info != null && this.info.getRegionName().length > 0) { - sb.append(": "); - sb.append(this.info.getRegionNameAsString()); - } - if (this.message != null && this.message.length > 0) { - sb.append(": " + Bytes.toString(this.message)); - } - return sb.toString(); - } - - /** - * @see java.lang.Object#equals(java.lang.Object) - */ - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - HMsg that = (HMsg)obj; - return this.type.equals(that.type) && - (this.info != null)? this.info.equals(that.info): - that.info == null; - } - - /** - * @see java.lang.Object#hashCode() - */ - @Override - public int hashCode() { - int result = this.type.hashCode(); - if (this.info != null) { - result ^= this.info.hashCode(); - } - return result; - } - - // //////////////////////////////////////////////////////////////////////////// - // Writable - ////////////////////////////////////////////////////////////////////////////// - - /** - * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput) - */ - public void write(DataOutput out) throws IOException { - out.writeInt(this.type.ordinal()); - this.info.write(out); - if (this.message == null || this.message.length == 0) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - Bytes.writeByteArray(out, this.message); - } - if (this.type.equals(Type.REGION_SPLIT)) { - this.daughterA.write(out); - this.daughterB.write(out); - } - } - - /** - * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput) - */ - public void readFields(DataInput in) throws IOException { - int ordinal = in.readInt(); - this.type = HMsg.Type.values()[ordinal]; - this.info.readFields(in); - boolean hasMessage = in.readBoolean(); - if (hasMessage) { - this.message = Bytes.readByteArray(in); - } - if (this.type.equals(Type.REGION_SPLIT)) { - this.daughterA = new HRegionInfo(); - this.daughterB = new HRegionInfo(); - this.daughterA.readFields(in); - this.daughterB.readFields(in); - } - } -} \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java b/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java index 7b8f193..7f3c9b0 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java +++ b/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java @@ -41,7 +41,6 @@ import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HMsg; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.HServerInfo; @@ -146,8 +145,14 @@ public class HbaseObjectWritable implements Writable, WritableWithSize, Configur // Hbase types addToMap(HColumnDescriptor.class, code++); addToMap(HConstants.Modify.class, code++); - addToMap(HMsg.class, code++); - addToMap(HMsg[].class, code++); + + // We used to have a class named HMsg but its been removed. Rather than + // just axe it, use following random Integer class -- we just chose any + // class from java.lang -- instead just so codes that follow stay + // in same relative place. + addToMap(Integer.class, code++); + addToMap(Integer[].class, code++); + addToMap(HRegion.class, code++); addToMap(HRegion[].class, code++); addToMap(HRegionInfo.class, code++); diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java b/src/main/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java index 25139b3..0209dea 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java @@ -19,22 +19,14 @@ */ package org.apache.hadoop.hbase.ipc; -import org.apache.hadoop.hbase.HMsg; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HServerInfo; +import java.io.IOException; + import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.ipc.VersionedProtocol; -import java.io.IOException; - /** - * HRegionServers interact with the HMasterRegionInterface to report on local - * goings-on and to obtain data-handling instructions from the HMaster. - *

Changes here need to be reflected in HbaseObjectWritable HbaseRPC#Invoker. - * - *

NOTE: if you change the interface, you must change the RPC version - * number in HBaseRPCProtocolVersion - * + * The Master publishes this Interface for RegionServers to register themselves + * on. */ public interface HMasterRegionInterface extends VersionedProtocol { /** @@ -44,32 +36,18 @@ public interface HMasterRegionInterface extends VersionedProtocol { // maintained a single global version number on all HBase Interfaces. This // meant all HBase RPC was broke though only one of the three RPC Interfaces // had changed. This has since been undone. - public static final long VERSION = 28L; + public static final long VERSION = 29L; /** * Called when a region server first starts - * @param info server info + * @param port Port number this regionserver is up on. + * @param serverStartcode This servers startcode. * @param serverCurrentTime The current time of the region server in ms * @throws IOException e * @return Configuration for the regionserver to use: e.g. filesystem, * hbase rootdir, etc. */ - public MapWritable regionServerStartup(HServerInfo info, - long serverCurrentTime) throws IOException; - - /** - * Called to renew lease, tell master what the region server is doing and to - * receive new instructions from the master - * - * @param info server's address and start code - * @param msgs things the region server wants to tell the master - * @param mostLoadedRegions Array of HRegionInfos that should contain the - * reporting server's most loaded regions. These are candidates for being - * rebalanced. - * @return instructions from the master to the region server - * @throws IOException e - */ - public HMsg[] regionServerReport(HServerInfo info, HMsg msgs[], - HRegionInfo mostLoadedRegions[]) + public MapWritable regionServerStartup(final int port, + final long serverStartcode, final long serverCurrentTime) throws IOException; } \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 3f3b696..cfa1e21 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HMsg; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.HServerInfo; @@ -601,8 +600,8 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { } @Override - public MapWritable regionServerStartup(final HServerInfo serverInfo, - final long serverCurrentTime) + public MapWritable regionServerStartup(final int port, + final long serverStartCode, final long serverCurrentTime) throws IOException { // Set the ip into the passed in serverInfo. Its ip is more than likely // not the ip that the master sees here. See at end of this method where @@ -610,16 +609,15 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { // Everafter, the HSI combination 'server name' is what uniquely identifies // the incoming RegionServer. InetSocketAddress address = new InetSocketAddress( - HBaseServer.getRemoteIp().getHostName(), - serverInfo.getServerAddress().getPort()); - serverInfo.setServerAddress(new HServerAddress(address)); + HBaseServer.getRemoteIp().getHostName(), port); // Register with server manager - this.serverManager.regionServerStartup(serverInfo, serverCurrentTime); + this.serverManager.regionServerStartup(address, serverStartCode, + serverCurrentTime); // Send back some config info MapWritable mw = createConfigurationSubset(); - mw.put(new Text("hbase.regionserver.address"), - serverInfo.getServerAddress()); + mw.put(new Text("hbase.regionserver.hostname"), + new Text(address.getHostName())); return mw; } @@ -637,26 +635,6 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { return mw; } - @Override - public HMsg [] regionServerReport(HServerInfo serverInfo, HMsg msgs[], - HRegionInfo[] mostLoadedRegions) - throws IOException { - return adornRegionServerAnswer(serverInfo, - this.serverManager.regionServerReport(serverInfo, msgs, mostLoadedRegions)); - } - - /** - * Override if you'd add messages to return to regionserver hsi - * or to send an exception. - * @param msgs Messages to add to - * @return Messages to return to - * @throws IOException exceptions that were injected for the region servers - */ - protected HMsg [] adornRegionServerAnswer(final HServerInfo hsi, - final HMsg [] msgs) throws IOException { - return msgs; - } - public boolean isMasterRunning() { return !isStopped(); } diff --git a/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 4d921da..10edcfb 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -110,11 +111,13 @@ public class ServerManager { /** * Let the server manager know a new regionserver has come online - * @param serverInfo + * @param address + * @param serverStartcode * @param serverCurrentTime The current time of the region server in ms * @throws IOException */ - void regionServerStartup(final HServerInfo serverInfo, long serverCurrentTime) + void regionServerStartup(final InetSocketAddress address, + final long serverStartcode, long serverCurrentTime) throws IOException { // Test for case where we get a region startup message from a regionserver // that has been quickly restarted but whose znode expiration handler has @@ -123,28 +126,32 @@ public class ServerManager { // is, reject the server and trigger its expiration. The next time it comes // in, it should have been removed from serverAddressToServerInfo and queued // for processing by ProcessServerShutdown. - HServerInfo info = new HServerInfo(serverInfo); - checkIsDead(info.getServerName(), "STARTUP"); - checkAlreadySameHostPort(info); - checkClockSkew(info, serverCurrentTime); - recordNewServer(info, false, null); + String serverName = HServerInfo.getServerName(address.getHostName(), + address.getPort(), serverStartcode); + checkIsDead(serverName, "STARTUP"); + String hostnamePort = + HServerInfo.getHostnamePort(address.getHostName(), address.getPort()); + checkAlreadySameHostPort(hostnamePort, serverStartcode); + checkClockSkew(serverName, serverCurrentTime); + recordNewServer(serverName); } /** * Test to see if we have a server of same host and port already. - * @param serverInfo + * @param hostAndPort + * @param serverStartcode * @throws PleaseHoldException */ - void checkAlreadySameHostPort(final HServerInfo serverInfo) + void checkAlreadySameHostPort(final String hostAndPort, + final long serverStartcode) throws PleaseHoldException { - String hostAndPort = serverInfo.getServerAddress().toString(); - HServerInfo existingServer = - haveServerWithSameHostAndPortAlready(serverInfo.getHostnamePort()); + HServerInfo existingServer = haveServerWithSameHostAndPortAlready(hostAndPort); if (existingServer != null) { String message = "Server start rejected; we already have " + hostAndPort + - " registered; existingServer=" + existingServer + ", newServer=" + serverInfo; + " registered; existingServer=" + existingServer + + ", newServer=" + hostAndPort + ", startCode=" + serverStartcode; LOG.info(message); - if (existingServer.getStartCode() < serverInfo.getStartCode()) { + if (existingServer.getStartCode() < serverStartcode) { LOG.info("Triggering server recovery; existingServer " + existingServer.getServerName() + " looks stale"); expireServer(existingServer); @@ -167,14 +174,16 @@ public class ServerManager { /** * Checks if the clock skew between the server and the master. If the clock * skew is too much it will throw an Exception. + * @param serverName Incoming servers's name + * @param serverCurrentTime * @throws ClockOutOfSyncException */ - private void checkClockSkew(final HServerInfo serverInfo, + private void checkClockSkew(final String serverName, final long serverCurrentTime) throws ClockOutOfSyncException { long skew = System.currentTimeMillis() - serverCurrentTime; if (skew > maxSkew) { - String message = "Server " + serverInfo.getServerName() + " has been " + + String message = "Server " + serverName + " has been " + "rejected; Reported time is too far out of sync with master. " + "Time difference of " + skew + "ms > max allowed of " + maxSkew + "ms"; LOG.warn(message); @@ -199,148 +208,13 @@ public class ServerManager { /** * Adds the HSI to the RS list - * @param info The region server informations - * @param useInfoLoad True if the load from the info should be used; e.g. - * under a master failover + * @param serverName The remote servers name. * @param hri Region interface. Can be null. */ - void recordNewServer(HServerInfo info, boolean useInfoLoad, - HRegionInterface hri) { - HServerLoad load = useInfoLoad? info.getLoad(): new HServerLoad(); - String serverName = info.getServerName(); - LOG.info("Registering server=" + serverName + ", regionCount=" + - load.getLoad() + ", userLoad=" + useInfoLoad); - info.setLoad(load); - // TODO: Why did we update the RS location ourself? Shouldn't RS do this? - // masterStatus.getZooKeeper().updateRSLocationGetWatch(info, watcher); - // -- If I understand the question, the RS does not update the location - // because could be disagreement over locations because of DNS issues; only - // master does DNS now -- St.Ack 20100929. + void recordNewServer(final String serverName) { + LOG.info("Registering server=" + serverName); this.onlineServers.put(serverName, info); - if (hri == null) { - serverConnections.remove(serverName); - } else { - serverConnections.put(serverName, hri); - } - } - - /** - * Called to process the messages sent from the region server to the master - * along with the heart beat. - * - * @param serverInfo - * @param msgs - * @param mostLoadedRegions Array of regions the region server is submitting - * as candidates to be rebalanced, should it be overloaded - * @return messages from master to region server indicating what region - * server should do. - * - * @throws IOException - */ - HMsg [] regionServerReport(final HServerInfo serverInfo, - final HMsg [] msgs, final HRegionInfo[] mostLoadedRegions) - throws IOException { - // Be careful. This method does returns in the middle. - HServerInfo info = new HServerInfo(serverInfo); - - // Check if dead. If it is, it'll get a 'You Are Dead!' exception. - checkIsDead(info.getServerName(), "REPORT"); - - // If we don't know this server, tell it shutdown. - HServerInfo storedInfo = this.onlineServers.get(info.getServerName()); - if (storedInfo == null) { - // Maybe we already have this host+port combo and its just different - // start code? - checkAlreadySameHostPort(info); - // Just let the server in. Presume master joining a running cluster. - // recordNewServer is what happens at the end of reportServerStartup. - // The only thing we are skipping is passing back to the regionserver - // the HServerInfo to use. Here we presume a master has already done - // that so we'll press on with whatever it gave us for HSI. - recordNewServer(info, true, null); - // If msgs, put off their processing but this is not enough because - // its possible that the next time the server reports in, we'll still - // not be up and serving. For example, if a split, we'll need the - // regions and servers setup in the master before the below - // handleSplitReport will work. TODO: FIx!! - if (msgs.length > 0) - throw new PleaseHoldException("FIX! Putting off " + - "message processing because not yet rwady but possible we won't be " + - "ready next on next report"); - } - - // Check startcodes - if (raceThatShouldNotHappenAnymore(storedInfo, info)) { - return HMsg.STOP_REGIONSERVER_ARRAY; - } - - for (HMsg msg: msgs) { - LOG.info("Received " + msg + " from " + serverInfo.getServerName()); - switch (msg.getType()) { - case REGION_SPLIT: - this.services.getAssignmentManager().handleSplitReport(serverInfo, - msg.getRegionInfo(), msg.getDaughterA(), msg.getDaughterB()); - break; - - default: - LOG.error("Unhandled msg type " + msg); - } - } - - HMsg [] reply = null; - int numservers = countOfRegionServers(); - if (this.clusterShutdown) { - if (numservers <= 2) { - // Shutdown needs to be staggered; the meta regions need to close last - // in case they need to be updated during the close melee. If <= 2 - // servers left, then these are the two that were carrying root and meta - // most likely (TODO: This presumes unsplittable meta -- FIX). Tell - // these servers can shutdown now too. - reply = HMsg.STOP_REGIONSERVER_ARRAY; - } - } - return processRegionServerAllsWell(info, mostLoadedRegions, reply); - } - - private boolean raceThatShouldNotHappenAnymore(final HServerInfo storedInfo, - final HServerInfo reportedInfo) { - if (storedInfo.getStartCode() != reportedInfo.getStartCode()) { - // TODO: I don't think this possible any more. We check startcodes when - // server comes in on regionServerStartup -- St.Ack - // This state is reachable if: - // 1) RegionServer A started - // 2) RegionServer B started on the same machine, then clobbered A in regionServerStartup. - // 3) RegionServer A returns, expecting to work as usual. - // The answer is to ask A to shut down for good. - LOG.warn("Race condition detected: " + reportedInfo.getServerName()); - synchronized (this.onlineServers) { - removeServerInfo(reportedInfo.getServerName()); - notifyOnlineServers(); - } - return true; - } - return false; - } - - /** - * RegionServer is checking in, no exceptional circumstances - * @param serverInfo - * @param mostLoadedRegions - * @param msgs - * @return - * @throws IOException - */ - private HMsg[] processRegionServerAllsWell(HServerInfo serverInfo, - final HRegionInfo[] mostLoadedRegions, HMsg[] msgs) - throws IOException { - // Refresh the info object and the load information - this.onlineServers.put(serverInfo.getServerName(), serverInfo); - HServerLoad load = serverInfo.getLoad(); - if (load != null && this.metrics != null) { - this.metrics.incrementRequests(load.getNumberOfRequests()); - } - // No more piggyback messages on heartbeats for other stuff - return msgs; + this.serverConnections.remove(serverName); } /** diff --git a/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java b/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java index 9ad3697..f5fba08 100644 --- a/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java +++ b/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java @@ -21,11 +21,8 @@ package org.apache.hadoop.hbase; import java.io.IOException; import java.security.PrivilegedAction; -import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -39,9 +36,7 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.io.MapWritable; -import org.apache.zookeeper.KeeperException; /** * This class creates a single process HBase cluster. @@ -86,75 +81,6 @@ public class MiniHBaseCluster { } /** - * Override Master so can add inject behaviors testing. - */ - public static class MiniHBaseClusterMaster extends HMaster { - private final Map> messages = - new ConcurrentHashMap>(); - - private final Map exceptions = - new ConcurrentHashMap(); - - public MiniHBaseClusterMaster(final Configuration conf) - throws IOException, KeeperException, InterruptedException { - super(conf); - } - - /** - * Add a message to send to a regionserver next time it checks in. - * @param hsi RegionServer's HServerInfo. - * @param msg Message to add. - */ - void addMessage(final HServerInfo hsi, HMsg msg) { - synchronized(this.messages) { - List hmsgs = this.messages.get(hsi); - if (hmsgs == null) { - hmsgs = new ArrayList(); - this.messages.put(hsi, hmsgs); - } - hmsgs.add(msg); - } - } - - void addException(final HServerInfo hsi, final IOException ex) { - this.exceptions.put(hsi, ex); - } - - /** - * This implementation is special, exceptions will be treated first and - * message won't be sent back to the region servers even if some are - * specified. - * @param hsi the rs - * @param msgs Messages to add to - * @return - * @throws IOException will be throw if any added for this region server - */ - @Override - protected HMsg[] adornRegionServerAnswer(final HServerInfo hsi, - final HMsg[] msgs) throws IOException { - IOException ex = this.exceptions.remove(hsi); - if (ex != null) { - throw ex; - } - HMsg [] answerMsgs = msgs; - synchronized (this.messages) { - List hmsgs = this.messages.get(hsi); - if (hmsgs != null && !hmsgs.isEmpty()) { - int size = answerMsgs.length; - HMsg [] newAnswerMsgs = new HMsg[size + hmsgs.size()]; - System.arraycopy(answerMsgs, 0, newAnswerMsgs, 0, answerMsgs.length); - for (int i = 0; i < hmsgs.size(); i++) { - newAnswerMsgs[answerMsgs.length + i] = hmsgs.get(i); - } - answerMsgs = newAnswerMsgs; - hmsgs.clear(); - } - } - return super.adornRegionServerAnswer(hsi, answerMsgs); - } - } - - /** * Subclass so can get at protected methods (none at moment). Also, creates * a FileSystem instance per instantiation. Adds a shutdown own FileSystem * on the way out. Shuts down own Filesystem only, not All filesystems as @@ -260,8 +186,7 @@ public class MiniHBaseCluster { try { // start up a LocalHBaseCluster hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, 0, - MiniHBaseCluster.MiniHBaseClusterMaster.class, - MiniHBaseCluster.MiniHBaseClusterRegionServer.class); + HMaster.class, MiniHBaseCluster.MiniHBaseClusterRegionServer.class); // manually add the regionservers as other users for (int i=0; i msgs = new ArrayList(); - HMsg hmsg = null; - final int size = 10; - for (int i = 0; i < size; i++) { - byte [] b = Bytes.toBytes(i); - hmsg = new HMsg(HMsg.Type.STOP_REGIONSERVER, - new HRegionInfo(new HTableDescriptor(Bytes.toBytes("test")), b, b)); - msgs.add(hmsg); - } - assertEquals(size, msgs.size()); - int index = msgs.indexOf(hmsg); - assertNotSame(-1, index); - msgs.remove(index); - assertEquals(size - 1, msgs.size()); - byte [] other = Bytes.toBytes("other"); - hmsg = new HMsg(HMsg.Type.STOP_REGIONSERVER, - new HRegionInfo(new HTableDescriptor(Bytes.toBytes("test")), other, other)); - assertEquals(-1, msgs.indexOf(hmsg)); - // Assert that two HMsgs are same if same content. - byte [] b = Bytes.toBytes(1); - hmsg = new HMsg(HMsg.Type.STOP_REGIONSERVER, - new HRegionInfo(new HTableDescriptor(Bytes.toBytes("test")), b, b)); - assertNotSame(-1, msgs.indexOf(hmsg)); - } - - public void testSerialization() throws IOException { - // Check out new HMsg that carries two daughter split regions. - byte [] abytes = Bytes.toBytes("a"); - byte [] bbytes = Bytes.toBytes("b"); - byte [] parentbytes = Bytes.toBytes("parent"); - HRegionInfo parent = - new HRegionInfo(new HTableDescriptor(Bytes.toBytes("parent")), - parentbytes, parentbytes); - // Assert simple HMsg serializes - HMsg hmsg = new HMsg(HMsg.Type.STOP_REGIONSERVER, parent); - byte [] bytes = Writables.getBytes(hmsg); - HMsg close = (HMsg)Writables.getWritable(bytes, new HMsg()); - assertTrue(close.equals(hmsg)); - // Assert split serializes - HRegionInfo daughtera = - new HRegionInfo(new HTableDescriptor(Bytes.toBytes("a")), abytes, abytes); - HRegionInfo daughterb = - new HRegionInfo(new HTableDescriptor(Bytes.toBytes("b")), bbytes, bbytes); - HMsg splithmsg = new HMsg(HMsg.Type.REGION_SPLIT, - parent, daughtera, daughterb, Bytes.toBytes("REGION_SPLIT")); - bytes = Writables.getBytes(splithmsg); - hmsg = (HMsg)Writables.getWritable(bytes, new HMsg()); - assertTrue(splithmsg.equals(hmsg)); - } -} diff --git a/src/test/java/org/apache/hadoop/hbase/TestSerialization.java b/src/test/java/org/apache/hadoop/hbase/TestSerialization.java index befcdaf..da1d5a7 100644 --- a/src/test/java/org/apache/hadoop/hbase/TestSerialization.java +++ b/src/test/java/org/apache/hadoop/hbase/TestSerialization.java @@ -84,21 +84,6 @@ public class TestSerialization { assertTrue(Bytes.equals("value".getBytes(), hmw.get("key".getBytes()))); } - @Test public void testHMsg() throws Exception { - final String name = "testHMsg"; - HMsg m = new HMsg(HMsg.Type.STOP_REGIONSERVER); - byte [] mb = Writables.getBytes(m); - HMsg deserializedHMsg = (HMsg)Writables.getWritable(mb, new HMsg()); - assertTrue(m.equals(deserializedHMsg)); - m = new HMsg(HMsg.Type.STOP_REGIONSERVER, - new HRegionInfo(new HTableDescriptor(name), - HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY), - "Some message".getBytes()); - mb = Writables.getBytes(m); - deserializedHMsg = (HMsg)Writables.getWritable(mb, new HMsg()); - assertTrue(m.equals(deserializedHMsg)); - } - @Test public void testTableDescriptor() throws Exception { final String name = "testTableDescriptor"; HTableDescriptor htd = createTableDescriptor(name); diff --git a/src/test/java/org/apache/hadoop/hbase/master/OOMEHMaster.java b/src/test/java/org/apache/hadoop/hbase/master/OOMEHMaster.java deleted file mode 100644 index bf5ed03..0000000 --- a/src/test/java/org/apache/hadoop/hbase/master/OOMEHMaster.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Copyright 2007 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HMsg; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HServerInfo; -import org.apache.zookeeper.KeeperException; - -/** - * An HMaster that runs out of memory. - * Everytime a region server reports in, add to the retained heap of memory. - * Needs to be started manually as in - * ${HBASE_HOME}/bin/hbase ./bin/hbase org.apache.hadoop.hbase.OOMEHMaster start/code>. - */ -public class OOMEHMaster extends HMaster { - private List retainer = new ArrayList(); - - public OOMEHMaster(HBaseConfiguration conf) - throws IOException, KeeperException, InterruptedException { - super(conf); - } - - @Override - public HMsg[] regionServerReport(HServerInfo serverInfo, HMsg[] msgs, - HRegionInfo[] mostLoadedRegions) - throws IOException { - // Retain 1M. - this.retainer.add(new byte [1024 * 1024]); - return super.regionServerReport(serverInfo, msgs, mostLoadedRegions); - } - - public static void main(String[] args) throws Exception { - new HMasterCommandLine(OOMEHMaster.class).doMain(args); - } -}