Index: CHANGES.txt =================================================================== --- CHANGES.txt (revision 556221) +++ CHANGES.txt (working copy) @@ -59,3 +59,4 @@ 35. HADOOP-1375 a simple parser for hbase (Edward Yoon via Stack) 36. HADOOP-1600 Update license in HBase code 37. HADOOP-1589 Exception handling in HBase is broken over client server + 38. HADOOP-1615 Custom message queue is replaced with java.util.concurrent.BlockingQueue Index: src/java/org/apache/hadoop/hbase/HMaster.java =================================================================== --- src/java/org/apache/hadoop/hbase/HMaster.java (revision 556221) +++ src/java/org/apache/hadoop/hbase/HMaster.java (working copy) @@ -25,7 +25,6 @@ import java.io.UnsupportedEncodingException; import java.util.Collections; import java.util.Iterator; -import java.util.LinkedList; import java.util.Map; import java.util.Random; import java.util.SortedMap; @@ -35,6 +34,9 @@ import java.util.TreeMap; import java.util.TreeSet; import java.util.Vector; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -83,8 +85,8 @@ int numRetries; long maxRegionOpenTime; - LinkedList msgQueue; - + BlockingQueue msgQueue; + private Leases serverLeases; private Server server; private HServerAddress address; @@ -635,7 +637,7 @@ this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000); this.numRetries = conf.getInt("hbase.client.retries.number", 2); this.maxRegionOpenTime = conf.getLong("hbase.hbasemaster.maxregionopen", 30 * 1000); - this.msgQueue = new LinkedList(); + this.msgQueue = new LinkedBlockingQueue(); this.serverLeases = new Leases( conf.getLong("hbase.master.lease.period", 30 * 1000), conf.getLong("hbase.master.lease.thread.wakefrequency", 15 * 1000)); @@ -736,38 +738,33 @@ } // Main processing loop - for (PendingOperation op = null; !closed; ) { - synchronized(msgQueue) { - while(msgQueue.size() == 0 && !closed) { - try { - msgQueue.wait(threadWakeFrequency); - } catch(InterruptedException iex) { - // continue - } - } - if(closed) { - continue; - } - op = msgQueue.removeFirst(); + PendingOperation op = null; + + while(!closed) { + try { + op = msgQueue.poll(1000, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.warn("Wait on msgQueue was interrupted: " + e.getMessage()); } + + if (op == null || closed) continue; + try { - if (LOG.isDebugEnabled()) { - LOG.debug("Processing " + op.toString()); - } + if (LOG.isDebugEnabled()) LOG.debug("Processing " + op.toString()); op.process(); - } catch (Exception ex) { if (ex instanceof RemoteException) { try { ex = RemoteExceptionHandler.decodeRemoteException((RemoteException) ex); - } catch (IOException e) { LOG.warn(e); } } LOG.warn(ex); - synchronized(msgQueue) { - msgQueue.addLast(op); + try { + msgQueue.put(op); + } catch (InterruptedException e) { + throw new RuntimeException("Putting into msgQueue was interrupted.", e); } } } @@ -881,10 +878,11 @@ // name, then we can timeout the old one right away and register // the new one. storedInfo = serversToServerInfo.remove(s); - if(storedInfo != null && !closed) { - synchronized(msgQueue) { - msgQueue.addLast(new PendingServerShutdown(storedInfo)); - msgQueue.notifyAll(); + if (storedInfo != null && !closed) { + try { + msgQueue.put(new PendingServerShutdown(storedInfo)); + } catch (InterruptedException e) { + throw new RuntimeException("Putting into msgQueue was interrupted.", e); } } @@ -1070,10 +1068,10 @@ } // Queue up an update to note the region location. - - synchronized(msgQueue) { - msgQueue.addLast(new PendingOpenReport(info, region)); - msgQueue.notifyAll(); + try { + msgQueue.put(new PendingOpenReport(info, region)); + } catch (InterruptedException e) { + throw new RuntimeException("Putting into msgQueue was interrupted.", e); } } break; @@ -1104,9 +1102,10 @@ unassignedRegions.remove(region.regionName); assignAttempts.remove(region.regionName); - synchronized(msgQueue) { - msgQueue.addLast(new PendingCloseReport(region, reassignRegion, deleteRegion)); - msgQueue.notifyAll(); + try { + msgQueue.put(new PendingCloseReport(region, reassignRegion, deleteRegion)); + } catch (InterruptedException e) { + throw new RuntimeException("Putting into msgQueue was interrupted.", e); } // NOTE: we cannot put the region into unassignedRegions as that @@ -2386,9 +2385,10 @@ HGlobals.rootRegionInfo); assignAttempts.put(HGlobals.rootRegionInfo.regionName, 0L); } - synchronized(msgQueue) { - msgQueue.addLast(new PendingServerShutdown(storedInfo)); - msgQueue.notifyAll(); + try { + msgQueue.put(new PendingServerShutdown(storedInfo)); + } catch (InterruptedException e) { + throw new RuntimeException("Putting into msgQueue was interrupted.", e); } } }