Index: src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 907382) +++ src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -47,6 +47,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -220,6 +221,11 @@ // A sleeper that sleeps for msgInterval. private final Sleeper sleeper; + private final ArrayBlockingQueue outboundMsgSignal = + new ArrayBlockingQueue(1000); + + private final Object signaler = new Object(); + private final long rpcTimeout; // Address passed in to constructor. @@ -591,7 +597,9 @@ } // Do some housekeeping before going to sleep housekeeping(); - sleeper.sleep(lastMsg); + now = System.currentTimeMillis(); + outboundMsgSignal.poll((msgInterval - (now - lastMsg)), + TimeUnit.MILLISECONDS); } // for } catch (Throwable t) { if (!checkOOME(t)) { @@ -693,6 +701,7 @@ // If passed msgs are not null, means we haven't passed them to master yet. if (msgs != null) return msgs; synchronized(this.outboundMsgs) { + this.outboundMsgSignal.clear(); return this.outboundMsgs.toArray(new HMsg[outboundMsgs.size()]); } } @@ -1357,6 +1366,10 @@ /* Add to the outbound message buffer */ private void reportOpen(HRegionInfo region) { outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, region)); + try { + outboundMsgSignal.put(signaler); + } catch (InterruptedException e) { + } } /* Add to the outbound message buffer */ @@ -1367,6 +1380,10 @@ /* Add to the outbound message buffer */ private void reportClose(final HRegionInfo region, final byte[] message) { outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_CLOSE, region, message)); + try { + outboundMsgSignal.put(signaler); + } catch (InterruptedException e) { + } } /** @@ -1381,12 +1398,15 @@ */ void reportSplit(HRegionInfo oldRegion, HRegionInfo newRegionA, HRegionInfo newRegionB) { - outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_SPLIT, oldRegion, - ("Daughters; " + - newRegionA.getRegionNameAsString() + ", " + - newRegionB.getRegionNameAsString()).getBytes())); - outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, newRegionA)); - outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, newRegionB)); + synchronized (outboundMsgs) { + outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_SPLIT, oldRegion, + ("Daughters; " + + newRegionA.getRegionNameAsString() + ", " + + newRegionB.getRegionNameAsString()).getBytes())); + outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, newRegionA)); + outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, newRegionB)); + outboundMsgSignal.add(signaler); + } } ////////////////////////////////////////////////////////////////////////////// @@ -1578,6 +1598,10 @@ */ public void addProcessingMessage(final HRegionInfo hri) { getOutboundMsgs().add(new HMsg(HMsg.Type.MSG_REPORT_PROCESS_OPEN, hri)); + try { + outboundMsgSignal.put(signaler); + } catch (InterruptedException e) { + } } protected void closeRegion(final HRegionInfo hri, final boolean reportWhenCompleted)