Index: src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 908352) +++ src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -151,8 +151,8 @@ new ConcurrentHashMap(); protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - private final List outboundMsgs = - Collections.synchronizedList(new ArrayList()); + private final LinkedBlockingQueue outboundMsgs = + new LinkedBlockingQueue(); final int numRetries; protected final int threadWakeFrequency; @@ -477,7 +477,8 @@ HMsg msgs[] = hbaseMaster.regionServerReport( serverInfo, outboundArray, getMostLoadedRegions()); lastMsg = System.currentTimeMillis(); - outboundArray = updateOutboundMsgs(outboundArray); + updateOutboundMsgs(outboundArray); + outboundArray = null; if (this.quiesced.get() && onlineRegions.size() == 0) { // We've just told the master we're exiting because we aren't // serving any regions. So set the stop bit and exit. @@ -591,7 +592,9 @@ } // Do some housekeeping before going to sleep housekeeping(); - sleeper.sleep(lastMsg); + now = System.currentTimeMillis(); + outboundMsgs.poll((msgInterval - (now - lastMsg)), + TimeUnit.MILLISECONDS); } // for } catch (Throwable t) { if (!checkOOME(t)) { @@ -699,17 +702,17 @@ /* * @param msgs Messages we sent the master. - * @return Null */ - private HMsg [] updateOutboundMsgs(final HMsg [] msgs) { - if (msgs == null) return null; - synchronized(this.outboundMsgs) { - for (HMsg m: msgs) { - int index = this.outboundMsgs.indexOf(m); - if (index != -1) this.outboundMsgs.remove(index); + private void updateOutboundMsgs(final HMsg [] msgs) { + if (msgs == null) return; + for (HMsg m: this.outboundMsgs) { + for (HMsg mm: msgs) { + if (mm.equals(m)) { + this.outboundMsgs.remove(m); + break; + } } } - return null; } /** @@ -1356,7 +1359,7 @@ /* Add to the outbound message buffer */ private void reportOpen(HRegionInfo region) { - outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, region)); + this.outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, region)); } /* Add to the outbound message buffer */ @@ -1366,7 +1369,7 @@ /* Add to the outbound message buffer */ private void reportClose(final HRegionInfo region, final byte[] message) { - outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_CLOSE, region, message)); + this.outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_CLOSE, region, message)); } /** @@ -1381,12 +1384,14 @@ */ void reportSplit(HRegionInfo oldRegion, HRegionInfo newRegionA, HRegionInfo newRegionB) { - outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_SPLIT, oldRegion, - ("Daughters; " + - newRegionA.getRegionNameAsString() + ", " + - newRegionB.getRegionNameAsString()).getBytes())); - outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, newRegionA)); - outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, newRegionB)); + synchronized (outboundMsgs) { + outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_SPLIT, oldRegion, + ("Daughters; " + + newRegionA.getRegionNameAsString() + ", " + + newRegionB.getRegionNameAsString()).getBytes())); + outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, newRegionA)); + outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, newRegionB)); + } } ////////////////////////////////////////////////////////////////////////////// @@ -2344,7 +2349,7 @@ /** * @return Queue to which you can add outbound messages. */ - protected List getOutboundMsgs() { + protected LinkedBlockingQueue getOutboundMsgs() { return this.outboundMsgs; } Index: src/java/org/apache/hadoop/hbase/master/ServerManager.java =================================================================== --- src/java/org/apache/hadoop/hbase/master/ServerManager.java (revision 908352) +++ src/java/org/apache/hadoop/hbase/master/ServerManager.java (working copy) @@ -455,8 +455,14 @@ break; case MSG_REPORT_SPLIT: - processSplitRegion(region, incomingMsgs[++i], incomingMsgs[++i]); + processSplitRegion(region, incomingMsgs[++i].getRegionInfo(), + incomingMsgs[++i].getRegionInfo()); break; + + case MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS: + processSplitRegion(region, incomingMsgs[i].getDaughterA(), + incomingMsgs[i].getDaughterB()); + break; default: LOG.warn("Impossible state during message processing. Instruction: " + @@ -497,14 +503,14 @@ * @param splitB * @param returnMsgs */ - private void processSplitRegion(HRegionInfo region, HMsg splitA, HMsg splitB) { + private void processSplitRegion(HRegionInfo region, HRegionInfo a, HRegionInfo b) { synchronized (master.regionManager) { // Cancel any actions pending for the affected region. // This prevents the master from sending a SPLIT message if the table // has already split by the region server. master.regionManager.endActions(region.getRegionName()); - assignSplitDaughter(splitA.getRegionInfo()); - assignSplitDaughter(splitB.getRegionInfo()); + assignSplitDaughter(a); + assignSplitDaughter(b); if (region.isMetaTable()) { // A meta region has split. master.regionManager.offlineMetaRegion(region.getStartKey()); Index: src/java/org/apache/hadoop/hbase/HMsg.java =================================================================== --- src/java/org/apache/hadoop/hbase/HMsg.java (revision 908352) +++ src/java/org/apache/hadoop/hbase/HMsg.java (working copy) @@ -108,11 +108,21 @@ * Run Major Compaction */ MSG_REGION_MAJOR_COMPACT, + + /** + * Region server split the region associated with this message. + * + * Its like MSG_REPORT_SPLIT only it carries the daughters in the message + * rather than send them individually in MSG_REPORT_OPEN messages. + */ + MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS, } private Type type = null; private HRegionInfo info = null; private byte[] message = null; + private HRegionInfo daughterA = null; + private HRegionInfo daughterB = null; /** Default constructor. Used during deserialization */ public HMsg() { @@ -145,6 +155,21 @@ * @param msg Optional message (Stringified exception, etc.) */ public HMsg(final HMsg.Type type, final HRegionInfo hri, final byte[] msg) { + this(type, hri, null, null, msg); + } + + /** + * Construct a message with the specified message and HRegionInfo + * + * @param type Message type + * @param hri Region to which message type applies. Cannot be + * null. If no info associated, used other Constructor. + * @param daughterA + * @param daughterB + * @param msg Optional message (Stringified exception, etc.) + */ + public HMsg(final HMsg.Type type, final HRegionInfo hri, + final HRegionInfo daughterA, final HRegionInfo daughterB, final byte[] msg) { if (type == null) { throw new NullPointerException("Message type cannot be null"); } @@ -154,6 +179,8 @@ } this.info = hri; this.message = msg; + this.daughterA = daughterA; + this.daughterB = daughterB; } /** @@ -182,6 +209,22 @@ } /** + * @return First daughter if Type is MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS else + * null + */ + public HRegionInfo getDaughterA() { + return this.daughterA; + } + + /** + * @return Second daughter if Type is MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS else + * null + */ + public HRegionInfo getDaughterB() { + return this.daughterB; + } + + /** * @see java.lang.Object#toString() */ @Override @@ -247,6 +290,10 @@ out.writeBoolean(true); Bytes.writeByteArray(out, this.message); } + if (this.type.equals(Type.MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS)) { + this.daughterA.write(out); + this.daughterB.write(out); + } } /** @@ -260,5 +307,9 @@ if (hasMessage) { this.message = Bytes.readByteArray(in); } + if (this.type.equals(Type.MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS)) { + this.daughterA.readFields(in); + this.daughterB.readFields(in); + } } -} +} \ No newline at end of file