Index: src/test/org/apache/hadoop/hbase/TestHMsg.java =================================================================== --- src/test/org/apache/hadoop/hbase/TestHMsg.java (revision 908352) +++ src/test/org/apache/hadoop/hbase/TestHMsg.java (working copy) @@ -19,13 +19,15 @@ */ package org.apache.hadoop.hbase; +import java.io.IOException; import java.util.ArrayList; import java.util.List; +import junit.framework.TestCase; + import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Writables; -import junit.framework.TestCase; - public class TestHMsg extends TestCase { public void testList() { List msgs = new ArrayList(); @@ -52,4 +54,29 @@ new HRegionInfo(new HTableDescriptor(Bytes.toBytes("test")), b, b)); assertNotSame(-1, msgs.indexOf(hmsg)); } + + public void testSerialization() throws IOException { + // Check out new HMsg that carries two daughter split regions. + byte [] abytes = Bytes.toBytes("a"); + byte [] bbytes = Bytes.toBytes("b"); + byte [] parentbytes = Bytes.toBytes("parent"); + HRegionInfo parent = + new HRegionInfo(new HTableDescriptor(Bytes.toBytes("parent")), + parentbytes, parentbytes); + // Assert simple HMsg serializes + HMsg hmsg = new HMsg(HMsg.Type.MSG_REGION_CLOSE, parent); + byte [] bytes = Writables.getBytes(hmsg); + HMsg close = (HMsg)Writables.getWritable(bytes, new HMsg()); + assertTrue(close.equals(hmsg)); + // Assert split serializes + HRegionInfo daughtera = + new HRegionInfo(new HTableDescriptor(Bytes.toBytes("a")), abytes, abytes); + HRegionInfo daughterb = + new HRegionInfo(new HTableDescriptor(Bytes.toBytes("b")), bbytes, bbytes); + HMsg splithmsg = new HMsg(HMsg.Type.MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS, + parent, daughtera, daughterb, Bytes.toBytes("split")); + bytes = Writables.getBytes(splithmsg); + hmsg = (HMsg)Writables.getWritable(bytes, new HMsg()); + assertTrue(splithmsg.equals(hmsg)); + } } Index: src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 908352) +++ src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -116,6 +116,7 @@ static final Log LOG = LogFactory.getLog(HRegionServer.class); private static final HMsg REPORT_EXITING = new HMsg(Type.MSG_REPORT_EXITING); private static final HMsg REPORT_QUIESCED = new HMsg(Type.MSG_REPORT_QUIESCED); + private static final HMsg [] EMPTY_HMSG_ARRAY = new HMsg [] {}; // Set when a report to the master comes back with a message asking us to // shutdown. Also set by call to stop when debugging or running unit tests @@ -151,8 +152,8 @@ new ConcurrentHashMap(); protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - private final List outboundMsgs = - Collections.synchronizedList(new ArrayList()); + private final LinkedBlockingQueue outboundMsgs = + new LinkedBlockingQueue(); final int numRetries; protected final int threadWakeFrequency; @@ -436,7 +437,7 @@ LOG.warn("No response from master on reportForDuty. Sleeping and " + "then trying again."); } - HMsg outboundArray[] = null; + List outboundMessages = new ArrayList(); long lastMsg = 0; // Now ask master what it wants us to do and tell it what we have done for (int tries = 0; !stopRequested.get() && isHealthy();) { @@ -457,10 +458,10 @@ LOG.warn("unable to report to master for " + (now - lastMsg) + " milliseconds - retrying"); } - // Send messages to the master IF this.msgInterval has elapsed OR if - // we have something to tell (and we didn't just fail sending master). - if ((now - lastMsg) >= msgInterval || - ((outboundArray == null || outboundArray.length == 0) && !this.outboundMsgs.isEmpty())) { + // Drop into the send loop if msgInterval has elapsed or if something + // to send. If we fail talking to the master, then we'll sleep below + // on poll of the outboundMsgs blockingqueue. + if ((now - lastMsg) >= msgInterval || !outboundMessages.isEmpty()) { try { doMetrics(); MemoryUsage memory = @@ -473,11 +474,13 @@ } this.serverInfo.setLoad(hsl); this.requestCount.set(0); - outboundArray = getOutboundMsgs(outboundArray); - HMsg msgs[] = hbaseMaster.regionServerReport( - serverInfo, outboundArray, getMostLoadedRegions()); + getOutboundMsgs(outboundMessages); + HMsg msgs[] = this.hbaseMaster.regionServerReport( + serverInfo, outboundMessages.toArray(EMPTY_HMSG_ARRAY), + getMostLoadedRegions()); lastMsg = System.currentTimeMillis(); - outboundArray = updateOutboundMsgs(outboundArray); + updateOutboundMsgs(outboundMessages); + outboundMessages.clear(); if (this.quiesced.get() && onlineRegions.size() == 0) { // We've just told the master we're exiting because we aren't // serving any regions. So set the stop bit and exit. @@ -589,9 +592,13 @@ lastMsg = System.currentTimeMillis(); } } - // Do some housekeeping before going to sleep + now = System.currentTimeMillis(); + HMsg msg = this.outboundMsgs.poll((msgInterval - (now - lastMsg)), + TimeUnit.MILLISECONDS); + // If we got something, add it to list of things to send. + if (msg != null) outboundMessages.add(msg); + // Do some housekeeping before going back around housekeeping(); - sleeper.sleep(lastMsg); } // for } catch (Throwable t) { if (!checkOOME(t)) { @@ -689,27 +696,36 @@ * @return Messages to send or returns current outboundMsgs if it already had * content to send. */ - private HMsg [] getOutboundMsgs(final HMsg [] msgs) { - // If passed msgs are not null, means we haven't passed them to master yet. - if (msgs != null) return msgs; - synchronized(this.outboundMsgs) { - return this.outboundMsgs.toArray(new HMsg[outboundMsgs.size()]); + private void getOutboundMsgs(final List msgs) { + if (msgs.isEmpty()) { + this.outboundMsgs.drainTo(msgs); + return; } + // Be careful don't add duplicates. + OUTER: for (HMsg m: this.outboundMsgs) { + for (HMsg mm: msgs) { + if (mm.equals(m)) { + // Then the outbound msgs already added to list of what to send. Skip. + continue OUTER; + } + } + msgs.add(m); + } } /* * @param msgs Messages we sent the master. - * @return Null */ - private HMsg [] updateOutboundMsgs(final HMsg [] msgs) { - if (msgs == null) return null; - synchronized(this.outboundMsgs) { - for (HMsg m: msgs) { - int index = this.outboundMsgs.indexOf(m); - if (index != -1) this.outboundMsgs.remove(index); + private void updateOutboundMsgs(final List msgs) { + if (msgs.isEmpty()) return; + for (HMsg m: this.outboundMsgs) { + for (HMsg mm: msgs) { + if (mm.equals(m)) { + this.outboundMsgs.remove(m); + break; + } } } - return null; } /** @@ -1203,8 +1219,7 @@ } /* - * Run some housekeeping tasks before we go into 'hibernation' sleeping at - * the end of the main HRegionServer run loop. + * Run some housekeeping tasks. */ private void housekeeping() { // If the todo list has > 0 messages, iterate looking for open region @@ -1356,7 +1371,7 @@ /* Add to the outbound message buffer */ private void reportOpen(HRegionInfo region) { - outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, region)); + this.outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, region)); } /* Add to the outbound message buffer */ @@ -1366,7 +1381,7 @@ /* Add to the outbound message buffer */ private void reportClose(final HRegionInfo region, final byte[] message) { - outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_CLOSE, region, message)); + this.outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_CLOSE, region, message)); } /** @@ -1381,12 +1396,11 @@ */ void reportSplit(HRegionInfo oldRegion, HRegionInfo newRegionA, HRegionInfo newRegionB) { - outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_SPLIT, oldRegion, - ("Daughters; " + - newRegionA.getRegionNameAsString() + ", " + - newRegionB.getRegionNameAsString()).getBytes())); - outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, newRegionA)); - outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, newRegionB)); + this.outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS, + oldRegion, newRegionA, newRegionB, + Bytes.toBytes("Daughters; " + + newRegionA.getRegionNameAsString() + ", " + + newRegionB.getRegionNameAsString()))); } ////////////////////////////////////////////////////////////////////////////// @@ -2344,7 +2358,7 @@ /** * @return Queue to which you can add outbound messages. */ - protected List getOutboundMsgs() { + protected LinkedBlockingQueue getOutboundMsgs() { return this.outboundMsgs; } Index: src/java/org/apache/hadoop/hbase/master/ServerManager.java =================================================================== --- src/java/org/apache/hadoop/hbase/master/ServerManager.java (revision 908352) +++ src/java/org/apache/hadoop/hbase/master/ServerManager.java (working copy) @@ -455,8 +455,14 @@ break; case MSG_REPORT_SPLIT: - processSplitRegion(region, incomingMsgs[++i], incomingMsgs[++i]); + processSplitRegion(region, incomingMsgs[++i].getRegionInfo(), + incomingMsgs[++i].getRegionInfo()); break; + + case MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS: + processSplitRegion(region, incomingMsgs[i].getDaughterA(), + incomingMsgs[i].getDaughterB()); + break; default: LOG.warn("Impossible state during message processing. Instruction: " + @@ -497,14 +503,14 @@ * @param splitB * @param returnMsgs */ - private void processSplitRegion(HRegionInfo region, HMsg splitA, HMsg splitB) { + private void processSplitRegion(HRegionInfo region, HRegionInfo a, HRegionInfo b) { synchronized (master.regionManager) { // Cancel any actions pending for the affected region. // This prevents the master from sending a SPLIT message if the table // has already split by the region server. master.regionManager.endActions(region.getRegionName()); - assignSplitDaughter(splitA.getRegionInfo()); - assignSplitDaughter(splitB.getRegionInfo()); + assignSplitDaughter(a); + assignSplitDaughter(b); if (region.isMetaTable()) { // A meta region has split. master.regionManager.offlineMetaRegion(region.getStartKey()); Index: src/java/org/apache/hadoop/hbase/HMsg.java =================================================================== --- src/java/org/apache/hadoop/hbase/HMsg.java (revision 908352) +++ src/java/org/apache/hadoop/hbase/HMsg.java (working copy) @@ -83,6 +83,7 @@ * * Note that this message is immediately followed by two MSG_REPORT_OPEN * messages, one for each of the new regions resulting from the split + * @deprecated See MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS */ MSG_REPORT_SPLIT, @@ -108,11 +109,21 @@ * Run Major Compaction */ MSG_REGION_MAJOR_COMPACT, + + /** + * Region server split the region associated with this message. + * + * Its like MSG_REPORT_SPLIT only it carries the daughters in the message + * rather than send them individually in MSG_REPORT_OPEN messages. + */ + MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS, } private Type type = null; private HRegionInfo info = null; private byte[] message = null; + private HRegionInfo daughterA = null; + private HRegionInfo daughterB = null; /** Default constructor. Used during deserialization */ public HMsg() { @@ -145,6 +156,21 @@ * @param msg Optional message (Stringified exception, etc.) */ public HMsg(final HMsg.Type type, final HRegionInfo hri, final byte[] msg) { + this(type, hri, null, null, msg); + } + + /** + * Construct a message with the specified message and HRegionInfo + * + * @param type Message type + * @param hri Region to which message type applies. Cannot be + * null. If no info associated, used other Constructor. + * @param daughterA + * @param daughterB + * @param msg Optional message (Stringified exception, etc.) + */ + public HMsg(final HMsg.Type type, final HRegionInfo hri, + final HRegionInfo daughterA, final HRegionInfo daughterB, final byte[] msg) { if (type == null) { throw new NullPointerException("Message type cannot be null"); } @@ -154,6 +180,8 @@ } this.info = hri; this.message = msg; + this.daughterA = daughterA; + this.daughterB = daughterB; } /** @@ -182,6 +210,22 @@ } /** + * @return First daughter if Type is MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS else + * null + */ + public HRegionInfo getDaughterA() { + return this.daughterA; + } + + /** + * @return Second daughter if Type is MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS else + * null + */ + public HRegionInfo getDaughterB() { + return this.daughterB; + } + + /** * @see java.lang.Object#toString() */ @Override @@ -247,6 +291,10 @@ out.writeBoolean(true); Bytes.writeByteArray(out, this.message); } + if (this.type.equals(Type.MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS)) { + this.daughterA.write(out); + this.daughterB.write(out); + } } /** @@ -260,5 +308,11 @@ if (hasMessage) { this.message = Bytes.readByteArray(in); } + if (this.type.equals(Type.MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS)) { + this.daughterA = new HRegionInfo(); + this.daughterB = new HRegionInfo(); + this.daughterA.readFields(in); + this.daughterB.readFields(in); + } } -} +} \ No newline at end of file