diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedCallQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedCallQueueRpcExecutor.java new file mode 100644 index 0000000..cc04a60 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedCallQueueRpcExecutor.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; + +class BalancedCallQueueRpcExecutor extends RpcExecutor { + static BalancedCallQueueRpcExecutor newSingleQueueExecutor( + String name, + int handlerCount, + CallQueueFactory queueFactory, + Configuration conf, Abortable abortable) { + + if (handlerCount < 1) { + handlerCount = 1; + } + + return new BalancedCallQueueRpcExecutor( + name, handlerCount, Collections.singletonList(queueFactory.create()), + conf, abortable); + } + + static BalancedCallQueueRpcExecutor newMultiQueuesExecutor( + String name, + int handlerCount, + int queueCount, + CallQueueFactory queueFactory, + Configuration conf, Abortable abortable) { + + if (queueCount < 1) { + queueCount = 1; + } + + if (handlerCount < queueCount) { + handlerCount = queueCount; + } + + List> queues = new ArrayList>(queueCount); + for (int i = 0; i < queueCount; i++) { + queues.add(queueFactory.create()); + } + + return new BalancedCallQueueRpcExecutor(name, handlerCount, queues, conf, abortable); + } + + private final List> queues; + private final QueueBalancer balancer; + + private BalancedCallQueueRpcExecutor( + String name, int handlerCount, + List> queues, + Configuration conf, Abortable abortable) { + super(name, handlerCount, conf, abortable); + + this.queues = queues; + this.balancer = getBalancer(queues.size()); + } + + @Override + void startHandlers(int port) { + final String threadPrefix = name; + for (int i = 0; i < handlerCount; i++) { + final int index = i % queues.size(); + final CallQueue q = queues.get(index); + + startHandler(threadPrefix, index, port, q); + } + } + + @Override + public int getQueueLength() { + int length = 0; + for (CallQueue queue : queues) { + length += queue.size(); + } + return length; + } + + @Override + public void dispatch(CallRunner callTask) throws InterruptedException { + int queueIndex = balancer.getNextQueue(); + queues.get(queueIndex).put(callTask); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java index 56424df..0350631 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java @@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.util.ReflectionUtils; */ @InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX }) @InterfaceStability.Evolving -public class BalancedQueueRpcExecutor extends RpcExecutor { +public class BalancedQueueRpcExecutor extends BlockingQueueRpcExecutor { protected final List> queues; private final QueueBalancer balancer; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BlockingQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BlockingQueueRpcExecutor.java new file mode 100644 index 0000000..8df0b80 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BlockingQueueRpcExecutor.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import java.util.List; +import java.util.concurrent.BlockingQueue; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; + +import com.google.common.base.Strings; + +abstract class BlockingQueueRpcExecutor extends RpcExecutor { + BlockingQueueRpcExecutor(String name, int handlerCount) { + super(name, handlerCount); + } + + BlockingQueueRpcExecutor( + String name, int handlerCount, Configuration conf, Abortable abortable) { + super(name, handlerCount, conf, abortable); + } + + /** Returns the list of request queues */ + protected abstract List> getQueues(); + + @Override + void startHandlers(final int port) { + List> callQueues = getQueues(); + startHandlers(null, handlerCount, callQueues, 0, callQueues.size(), port); + } + + protected void startHandlers(final String nameSuffix, final int numHandlers, + final List> callQueues, + final int qindex, final int qsize, final int port) { + + final String threadPrefix = name + Strings.nullToEmpty(nameSuffix); + for (int i = 0; i < numHandlers; i++) { + final int index = qindex + (i % qsize); + final BlockingQueue q = callQueues.get(index); + + startHandler(threadPrefix, index, port, new CallQueueAdapter(q)); + } + } + + protected void consumerLoop(final BlockingQueue myQueue) { + consumerLoop(new CallQueueAdapter(myQueue)); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallQueue.java new file mode 100644 index 0000000..93b13f6 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallQueue.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +interface CallQueue { + /** + * Retrieves and removes the head of this queue, waiting if necessary + * until an element becomes available. + * + * @return the head of this queue + * @throws InterruptedException if interrupted while waiting + */ + E take() throws InterruptedException; + + /** + * Inserts the specified element into this queue, waiting if necessary + * for space to become available. + * + * @param e the element to add + * @throws InterruptedException if interrupted while waiting + * @throws ClassCastException if the class of the specified element + * prevents it from being added to this queue + * @throws NullPointerException if the specified element is null + * @throws IllegalArgumentException if some property of the specified + * element prevents it from being added to this queue + */ + void put(E e) throws InterruptedException; + + + /** + * Returns the number of elements in this queue. + */ + int size(); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallQueueAdapter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallQueueAdapter.java new file mode 100644 index 0000000..fc44c7d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallQueueAdapter.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import java.util.concurrent.BlockingQueue; + +class CallQueueAdapter implements CallQueue { + private final BlockingQueue queue; + + /** + * @throws NullPointerException if {@code queue} is null + */ + CallQueueAdapter(BlockingQueue queue) { + if (queue == null) { + throw new NullPointerException(); + } + + this.queue = queue; + } + + /** + * @throws InterruptedException {@inheritDoc} + */ + @Override + public E take() throws InterruptedException { + return queue.take(); + } + + /** + * @throws InterruptedException {@inheritDoc} + * @throws ClassCastException {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + * @throws IllegalArgumentException {@inheritDoc} + */ + @Override + public void put(E e) throws InterruptedException { + queue.put(e); + } + + @Override + public int size() { + return queue.size(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallQueueFactories.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallQueueFactories.java new file mode 100644 index 0000000..4e118dd --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallQueueFactories.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import java.util.Comparator; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue; + +abstract class CallQueueFactories { + private static final Log LOG = LogFactory.getLog(CallQueueFactories.class); + + abstract CallQueueFactory getFifoCallQueueFacotry(int capacity); + abstract CallQueueFactory getPriorityCallQueueFactory( + int capacity, Comparator comparator); + + private static final CallQueueFactories basedOnSemaphore = new CallQueueFactories() { + @Override + CallQueueFactory getFifoCallQueueFacotry(final int capacity) { + return new CallQueueFactory() { + @Override + public CallQueue create() { + return new SemaphoreBasedLinkedCallQueue(capacity); + } + }; + } + + @Override + CallQueueFactory getPriorityCallQueueFactory( + final int capacity, final Comparator comparator) { + return new CallQueueFactory() { + @Override + public CallQueue create() { + return new SemaphoreBasedPriorityCallQueue(capacity, comparator); + } + }; + } + }; + + private static final CallQueueFactories basedOnBlockingQueue = new CallQueueFactories() { + @Override + CallQueueFactory getFifoCallQueueFacotry(final int capacity) { + return new CallQueueFactory() { + @Override + public CallQueue create() { + return new CallQueueAdapter(new LinkedBlockingQueue(capacity)); + } + }; + } + + @Override + CallQueueFactory getPriorityCallQueueFactory( + final int capacity, final Comparator comparator) { + return new CallQueueFactory() { + @Override + public CallQueue create() { + return new CallQueueAdapter(new BoundedPriorityBlockingQueue( + capacity, comparator)); + } + }; + } + }; + + private static final String USE_BLOCKING_QUEUE = + CallQueueFactories.class.getName() + ".useBlockingQueue"; + + static CallQueueFactories getInstance(Configuration conf) { + if (conf.getBoolean(USE_BLOCKING_QUEUE, false)) { + LOG.info("Using blocking queue based call queues."); + return basedOnBlockingQueue; + } else { + return basedOnSemaphore; + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallQueueFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallQueueFactory.java new file mode 100644 index 0000000..fb0aed1 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallQueueFactory.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +interface CallQueueFactory { + CallQueue create(); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWCallQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWCallQueueRpcExecutor.java new file mode 100644 index 0000000..61195ab --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWCallQueueRpcExecutor.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; + +import com.google.protobuf.Message; + +class RWCallQueueRpcExecutor extends RpcExecutor { + static RWCallQueueRpcExecutor newSingleQueuesExecutor( + String name, + int handlerCount, float readShare, float scanShare, + CallQueueFactory writeQueueFactory, + CallQueueFactory readQueueFactory, + CallQueueFactory scanQueueFactory, + Configuration conf, Abortable abortable) { + + if (handlerCount < 2) { + handlerCount = 2; + } + + int writeHandlerCount = calcNumWriter(handlerCount, readShare); + int readAndScanHandlers = handlerCount - writeHandlerCount; + + assert writeHandlerCount >= 1; + assert readAndScanHandlers >= 1; + + int readHandlerCount = calcNumReader(readAndScanHandlers, scanShare); + int scanHandlerCount = readAndScanHandlers - readHandlerCount; + + assert readHandlerCount >= 1; + assert scanHandlerCount >= 0; + + List> writeQueues = Collections.singletonList(writeQueueFactory.create()); + List> readQueues = Collections.singletonList(readQueueFactory.create()); + List> scanQueues; + if (scanHandlerCount == 0) { + scanQueues = Collections.emptyList(); + } else { + scanQueues = Collections.singletonList(scanQueueFactory.create()); + } + + return new RWCallQueueRpcExecutor( + name, + writeHandlerCount, readHandlerCount, scanHandlerCount, + writeQueues, readQueues, scanQueues, + conf, abortable); + } + + static RWCallQueueRpcExecutor newMultiQueuesExecutor( + String name, + int handlerCount, int queueCount, float readShare, float scanShare, + CallQueueFactory writeQueueFactory, + CallQueueFactory readQueueFactory, + CallQueueFactory scanQueueFactory, + Configuration conf, Abortable abortable) { + + if (handlerCount < 2) { + handlerCount = 2; + } + + if (queueCount < 2) { + queueCount = 2; + } + + int writeQueueCount = calcNumWriter(queueCount, readShare); + int readAndScanQueues = queueCount - writeQueueCount; + + assert writeQueueCount >= 1; + assert readAndScanQueues >= 1; + + int writeHandlerCount = calcNumWriter(handlerCount, readShare); + int readAndScanHandlers = handlerCount - writeHandlerCount; + + assert writeHandlerCount >= 1; + assert readAndScanHandlers >= 1; + + int readQueueCount = calcNumReader(readAndScanQueues, scanShare); + int scanQueueCount = readAndScanQueues - readQueueCount; + + assert readQueueCount >= 1; + assert scanQueueCount >= 0; + + int readHandlerCount; + int scanHandlerCount; + if (scanQueueCount == 0) { + readHandlerCount = readAndScanHandlers; + scanHandlerCount = 0; + } else { + readHandlerCount = calcNumReader(readAndScanHandlers, scanShare); + scanHandlerCount = readAndScanHandlers - readHandlerCount; + } + + assert readHandlerCount >= 1; + assert scanHandlerCount >= 0; + + if (writeHandlerCount < writeQueueCount) { + writeHandlerCount = writeQueueCount; + } + + if (readHandlerCount < readQueueCount) { + readHandlerCount = readQueueCount; + } + + List> writeQueues = new ArrayList>(writeQueueCount); + for (int i=0; i> readQueues = new ArrayList>(readQueueCount); + for (int i=0; i> scanQueues = new ArrayList>(scanHandlerCount); + for (int i=0; i> writeQueues; + private final List> readQueues; + private final List> scanQueues; + + private RWCallQueueRpcExecutor(String name, + int writeHandlerCount, + int readHandlerCount, + int scanHandlerCount, + List> writeQueues, + List> readQueues, + List> scanQueues, + Configuration conf, Abortable abortable) { + + super(name, writeHandlerCount + readHandlerCount + scanHandlerCount, conf, abortable); + + this.writeHandlerCount = writeHandlerCount; + this.readHandlerCount = readHandlerCount; + this.scanHandlerCount = scanHandlerCount; + + this.writeQueues = writeQueues; + this.readQueues = readQueues; + this.scanQueues = scanQueues; + + this.writeBalancer = getBalancer(writeQueues.size()); + this.readBalancer = getBalancer(readQueues.size()); + this.scanBalancer = (scanQueues.size() == 0) ? null : getBalancer(scanQueues.size()); + } + + @Override + void startHandlers(int port) { + for (int i = 0; i < writeHandlerCount; i++) { + int index = i % writeQueues.size(); + CallQueue q = writeQueues.get(index); + int qSerialIndex = index; + startHandler(name + ".write", qSerialIndex, port, q); + } + + for (int i = 0; i < readHandlerCount; i++) { + int index = i % readQueues.size(); + CallQueue q = readQueues.get(index); + int qSerialIndex = writeQueues.size() + index; + startHandler(name + ".read", qSerialIndex, port, q); + } + + for (int i = 0; i < scanHandlerCount; i++) { + int index = i % scanQueues.size(); + CallQueue q = scanQueues.get(index); + int qSerialIndex = writeQueues.size() + readQueues.size() + index; + startHandler(name + ".scan", qSerialIndex, port , q); + } + } + + @Override + public int getQueueLength() { + int length = 0; + for (CallQueue q : writeQueues) { + length += q.size(); + } + for (CallQueue q : readQueues) { + length += q.size(); + } + for (CallQueue q : scanQueues) { + length += q.size(); + } + return length; + } + + @Override + public void dispatch(CallRunner callTask) throws InterruptedException { + selectQueue(callTask).put(callTask); + } + + private CallQueue selectQueue(CallRunner callTask) { + RpcServer.Call call = callTask.getCall(); + RequestHeader header = call.getHeader(); + Message param = call.param; + + if (RWQueueRpcExecutor.isWriteRequest(header, param)) { + return writeQueues.get(writeBalancer.getNextQueue()); + } + + if (scanBalancer != null && RWQueueRpcExecutor.isScanRequest(header, param)) { + return scanQueues.get(scanBalancer.getNextQueue()); + } + + return readQueues.get(readBalancer.getNextQueue()); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java index 1be8c65..8ebd13c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java @@ -49,7 +49,7 @@ import com.google.protobuf.Message; */ @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) @InterfaceStability.Evolving -public class RWQueueRpcExecutor extends RpcExecutor { +public class RWQueueRpcExecutor extends BlockingQueueRpcExecutor { private static final Log LOG = LogFactory.getLog(RWQueueRpcExecutor.class); private final List> queues; @@ -173,7 +173,7 @@ public class RWQueueRpcExecutor extends RpcExecutor { queues.get(queueIndex).put(callTask); } - private boolean isWriteRequest(final RequestHeader header, final Message param) { + static boolean isWriteRequest(final RequestHeader header, final Message param) { // TODO: Is there a better way to do this? if (param instanceof MultiRequest) { MultiRequest multi = (MultiRequest)param; @@ -206,7 +206,7 @@ public class RWQueueRpcExecutor extends RpcExecutor { return false; } - private boolean isScanRequest(final RequestHeader header, final Message param) { + static boolean isScanRequest(final RequestHeader header, final Message param) { if (param instanceof ScanRequest) { // The first scan request will be executed as a "short read" ScanRequest request = (ScanRequest)param; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java index 709429d..54bfd53 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.ipc; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; @@ -31,6 +30,7 @@ import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.Counter; import org.apache.hadoop.util.StringUtils; import com.google.common.base.Preconditions; @@ -41,10 +41,10 @@ import com.google.common.base.Strings; public abstract class RpcExecutor { private static final Log LOG = LogFactory.getLog(RpcExecutor.class); - private final AtomicInteger activeHandlerCount = new AtomicInteger(0); + private final Counter activeHandlerCount = new Counter(); private final List handlers; - private final int handlerCount; - private final String name; + final int handlerCount; + final String name; private final AtomicInteger failedHandlerCount = new AtomicInteger(0); private boolean running; @@ -78,7 +78,7 @@ public abstract class RpcExecutor { } public int getActiveHandlerCount() { - return activeHandlerCount.get(); + return (int)activeHandlerCount.get(); } /** Returns the length of the pending queue */ @@ -87,36 +87,24 @@ public abstract class RpcExecutor { /** Add the request to the executor queue */ public abstract void dispatch(final CallRunner callTask) throws InterruptedException; - /** Returns the list of request queues */ - protected abstract List> getQueues(); + abstract void startHandlers(int port); - protected void startHandlers(final int port) { - List> callQueues = getQueues(); - startHandlers(null, handlerCount, callQueues, 0, callQueues.size(), port); - } - - protected void startHandlers(final String nameSuffix, final int numHandlers, - final List> callQueues, - final int qindex, final int qsize, final int port) { - final String threadPrefix = name + Strings.nullToEmpty(nameSuffix); - for (int i = 0; i < numHandlers; i++) { - final int index = qindex + (i % qsize); - Thread t = new Thread(new Runnable() { - @Override - public void run() { - consumerLoop(callQueues.get(index)); - } - }); - t.setDaemon(true); - t.setName(threadPrefix + "RpcServer.handler=" + handlers.size() + - ",queue=" + index + ",port=" + port); - t.start(); - LOG.debug(threadPrefix + " Start Handler index=" + handlers.size() + " queue=" + index); - handlers.add(t); - } + void startHandler(String threadPrefix, int index, int port, final CallQueue queue) { + Thread t = new Thread(new Runnable() { + @Override + public void run() { + consumerLoop(queue); + } + }); + t.setDaemon(true); + t.setName(threadPrefix + "RpcServer.handler=" + handlers.size() + + ",queue=" + index + ",port=" + port); + t.start(); + LOG.debug(threadPrefix + " Start Handler index=" + handlers.size() + " queue=" + index); + handlers.add(t); } - protected void consumerLoop(final BlockingQueue myQueue) { + void consumerLoop(final CallQueue myQueue) { boolean interrupted = false; double handlerFailureThreshhold = conf == null ? 1.0 : conf.getDouble(HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT, @@ -126,7 +114,7 @@ public abstract class RpcExecutor { try { CallRunner task = myQueue.take(); try { - activeHandlerCount.incrementAndGet(); + activeHandlerCount.increment(); task.run(); } catch (Throwable e) { if (e instanceof Error) { @@ -153,7 +141,7 @@ public abstract class RpcExecutor { + StringUtils.stringifyException(e)); } } finally { - activeHandlerCount.decrementAndGet(); + activeHandlerCount.decrement(); } } catch (InterruptedException e) { interrupted = true; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SemaphoreBasedCallQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SemaphoreBasedCallQueue.java new file mode 100644 index 0000000..97b2e1d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SemaphoreBasedCallQueue.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import java.util.concurrent.Semaphore; + +/** + * A call queue with semaphores which are used for exclusive control. + */ +abstract class SemaphoreBasedCallQueue implements CallQueue { + /** + * Retrieves and removes the head of the internal queue. + */ + abstract E internalPoll(); + + /** + * Inserts the specified element into the internal queue. + * + * @throws ClassCastException if the class of the specified element + * prevents it from being added to the internal queue + * @throws IllegalArgumentException if some property of this element + * prevents it from being added to the internal queue + */ + abstract void internalOffer(E e); + + + private final Semaphore takeSemaphore = new Semaphore(0); + private final Semaphore putSemaphore; + + /** + * Creates an instance with the given capacity. + * @throws IllegalArgumentException if {@code capacity} is non-positive + */ + SemaphoreBasedCallQueue(int capacity) { + if (capacity <= 0) { + throw new IllegalArgumentException(); + } + this.putSemaphore = new Semaphore(capacity); + } + + /** + * @throws InterruptedException {@inheritDoc} + */ + @Override + public E take() throws InterruptedException { + takeSemaphore.acquire(); + E e = internalPoll(); + putSemaphore.release(); + return e; + } + + /** + * @throws InterruptedException {@inheritDoc} + * @throws ClassCastException {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + * @throws IllegalArgumentException {@inheritDoc} + */ + @Override + public void put(E e) throws InterruptedException { + if (e == null) { + throw new NullPointerException(); + } + + putSemaphore.acquire(); + + try { + internalOffer(e); + } catch (RuntimeException ex) { + putSemaphore.release(); + throw ex; + } + + takeSemaphore.release(); + } + + @Override + public int size() { + return takeSemaphore.availablePermits(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SemaphoreBasedLinkedCallQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SemaphoreBasedLinkedCallQueue.java new file mode 100644 index 0000000..258f333 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SemaphoreBasedLinkedCallQueue.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * A FIFO call queue based on linked nodes. + *

+ * This queue bases on CAS and its take/put methods are faster than + * those of {@code LinkedBlockingQueue} especially in heavy congestion. + */ +class SemaphoreBasedLinkedCallQueue extends SemaphoreBasedCallQueue { + private final Queue queue = new ConcurrentLinkedQueue(); + + /** + * Creates an instance with the given capacity. + * + * @throws IllegalArgumentException if {@code capacity} is non-positive + */ + SemaphoreBasedLinkedCallQueue(int capacity) { + super(capacity); + } + + @Override + E internalPoll() { + return queue.poll(); + } + + @Override + void internalOffer(E e) { + queue.offer(e); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SemaphoreBasedPriorityCallQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SemaphoreBasedPriorityCallQueue.java new file mode 100644 index 0000000..403f6c2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SemaphoreBasedPriorityCallQueue.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import java.util.Comparator; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; + +/** + * A priority call queue. + * The elements of the queue are ordered according to a comparator + * or their natural ordering, and the least element is given by polling. + * The order of elements which have the same priority is undefined. + *

+ * This queue bases on CAS and its take/put methods are faster than + * those of {@code PriorityBlockingQueue} especially in heavy congestion. + */ +class SemaphoreBasedPriorityCallQueue extends SemaphoreBasedCallQueue { + private static abstract class Container { + final E element; + + Container(E element) { + this.element = element; + } + + abstract int addition(); + + static Container newInstance(E element) { + return new Container(element) { + @Override int addition() { return hashCode(); } + }; + } + } + + private static int compareAddition(Container c1, Container c2) { + int a1 = c1.addition(); + int a2 = c2.addition(); + return (a1 < a2) ? -1 : ((a1 == a2) ? 0 : 1); + } + + private static class ContainerComparator implements Comparator> { + final Comparator elementComparator; + + ContainerComparator(Comparator elementComparator) { + this.elementComparator = elementComparator; + } + + @Override + public int compare(Container c1, Container c2) { + int result = elementComparator.compare(c1.element, c2.element); + return (result != 0) ? result : compareAddition(c1, c2); + } + } + + private static class NaturalContainerComparator implements Comparator> { + @Override + public int compare(Container c1, Container c2) { + @SuppressWarnings("unchecked") + int result = ((Comparable) c1.element).compareTo(c2.element); + return (result != 0) ? result : compareAddition(c1, c2); + } + } + + @SuppressWarnings("rawtypes") + private static final NaturalContainerComparator NATURAL_CONTAINER_COMPARATOR = + new NaturalContainerComparator(); + + /** + * @param elementComparator null for the natural ordering + */ + private static Comparator> getContainerComparator( + Comparator elementComparator) { + + if (elementComparator == null) { + @SuppressWarnings("unchecked") + Comparator> cpr = NATURAL_CONTAINER_COMPARATOR; + return cpr; + } + + return new ContainerComparator(elementComparator); + } + + private final ConcurrentNavigableMap, Object> containerMap; + + /** + * Creates an instance with the given capacity and the given comparator. + * + * @param capacity the max count of elements in the queue + * @param comparator the comparator that will be used to order this queue. + * If null, the natural ordering of the elements will be used. + * @throws IllegalArgumentException if {@code capacity} is non-positive + */ + SemaphoreBasedPriorityCallQueue(int capacity, Comparator comparator) { + super(capacity); + containerMap = new ConcurrentSkipListMap, Object>( + getContainerComparator(comparator)); + } + + @Override + E internalPoll() { + Entry, Object> entry = containerMap.pollFirstEntry(); + if (entry == null) { + return null; + } + return entry.getKey().element; + } + + /** + * @throws ClassCastException {@inheritDoc} + */ + @Override + void internalOffer(E e) { + while (containerMap.putIfAbsent(Container.newInstance(e), Boolean.TRUE) != null) {} + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index b8e9c52..0f92ea2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue; /** * A scheduler that maintains isolated handler pools for general, @@ -127,36 +126,71 @@ public class SimpleRpcScheduler extends RpcScheduler { LOG.info("Using " + callQueueType + " as user call queue, count=" + numCallQueues); - if (numCallQueues > 1 && callqReadShare > 0) { - // multiple read/write queues - if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { - CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority); - callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues, - callqReadShare, callqScanShare, maxQueueLength, conf, abortable, - BoundedPriorityBlockingQueue.class, callPriority); + CallQueueFactories queueFactories = CallQueueFactories.getInstance(conf); + CallQueueFactory fifoQueueFactory = + queueFactories.getFifoCallQueueFacotry(maxQueueLength); + CallQueueFactory priorityQueueFactory = + queueFactories.getPriorityCallQueueFactory( + maxQueueLength, new CallPriorityComparator(conf, this.priority)); + + CallQueueFactory requestedQueueFactory; + if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { + requestedQueueFactory = priorityQueueFactory; + } else { + requestedQueueFactory = fifoQueueFactory; + } + + if (numCallQueues > 1) { + if (callqReadShare > 0) { + callExecutor = RWCallQueueRpcExecutor.newMultiQueuesExecutor( + "RW.default", handlerCount, numCallQueues, + callqReadShare, callqScanShare, + fifoQueueFactory, requestedQueueFactory, requestedQueueFactory, + conf, abortable); } else { - callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues, - callqReadShare, callqScanShare, maxQueueLength, conf, abortable); + callExecutor = BalancedCallQueueRpcExecutor.newMultiQueuesExecutor( + "B.default", handlerCount, numCallQueues, + requestedQueueFactory, + conf, abortable); } } else { - // multiple queues - if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { - CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority); - callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues, - conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority); + if (callqReadShare > 0) { + callExecutor = RWCallQueueRpcExecutor.newSingleQueuesExecutor( + "RW.default", handlerCount, + callqReadShare, callqScanShare, + fifoQueueFactory, requestedQueueFactory, requestedQueueFactory, + conf, abortable); } else { - callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, - numCallQueues, maxQueueLength, conf, abortable); + callExecutor = BalancedCallQueueRpcExecutor.newSingleQueueExecutor( + "B.default", handlerCount, + requestedQueueFactory, + conf, abortable); } } - // Create 2 queues to help priorityExecutor be more scalable. - this.priorityExecutor = priorityHandlerCount > 0 ? - new BalancedQueueRpcExecutor("Priority", priorityHandlerCount, 2, maxQueueLength) : null; + if (priorityHandlerCount > 0) { + if (numCallQueues > 1) { + // Create 2 queues to help priorityExecutor be more scalable + // as multiplex queues are expected for more scalable. + this.priorityExecutor = BalancedCallQueueRpcExecutor.newMultiQueuesExecutor( + "Priority", priorityHandlerCount, 2, fifoQueueFactory, + conf, abortable); + } else { + this.priorityExecutor = BalancedCallQueueRpcExecutor.newSingleQueueExecutor( + "Priority", priorityHandlerCount, fifoQueueFactory, + conf, abortable); + } + } else { + this.priorityExecutor = null; + } - this.replicationExecutor = - replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("Replication", - replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null; + if (replicationHandlerCount > 0) { + this.replicationExecutor = BalancedCallQueueRpcExecutor.newSingleQueueExecutor( + "Replication", replicationHandlerCount, fifoQueueFactory, + conf, abortable); + } else { + this.replicationExecutor = null; + } } public SimpleRpcScheduler( diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSemaphoreBasedCallQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSemaphoreBasedCallQueue.java new file mode 100644 index 0000000..08b58ac --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSemaphoreBasedCallQueue.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestSemaphoreBasedCallQueue { + static class QueueImpl extends SemaphoreBasedCallQueue { + QueueImpl(int capacity) { + super(capacity); + } + + volatile String log = ""; + + @Override + String internalPoll() { + log += "poll/"; + return "poll"; + } + + @Override + void internalOffer(String e) { + log += "offer(" + e + ")/"; + } + } + + @Test(timeout=1000) + public void testPut() throws Exception { + final QueueImpl queue = new QueueImpl(2); + final AtomicBoolean assertionFailed = new AtomicBoolean(); + + Thread putThread = new Thread() { + @Override + public void run() { + try { + queue.put("a"); + queue.put("b");; + queue.put("c"); + } catch (InterruptedException e) { + assertionFailed.set(true);; + } + } + }; + + putThread.start(); + Thread.sleep(10); + Assert.assertTrue(putThread.isAlive()); + + Assert.assertEquals("poll", queue.take()); + Thread.sleep(10); + Assert.assertFalse(putThread.isAlive()); + + if (assertionFailed.get()) { + Assert.fail(); + } + + Assert.assertEquals("offer(a)/offer(b)/poll/offer(c)/", queue.log); + } + + @Test(timeout=1000) + public void testTake() throws Exception { + final QueueImpl queue = new QueueImpl(2); + final AtomicBoolean assertionFailed = new AtomicBoolean(); + + Thread takeThread = new Thread() { + @Override + public void run() { + try { + if (! "poll".equals(queue.take())) { + assertionFailed.set(true);; + } + } catch (InterruptedException e) { + assertionFailed.set(true); + } + } + }; + + takeThread.start(); + Thread.sleep(10); + Assert.assertTrue(takeThread.isAlive()); + + queue.put("a"); + Thread.sleep(10); + Assert.assertFalse(takeThread.isAlive()); + + if (assertionFailed.get()) { + Assert.fail(); + } + + Assert.assertEquals("offer(a)/poll/", queue.log); + } + + @Test(timeout=1000) + public void testSize() throws Exception { + final QueueImpl queue = new QueueImpl(2); + Assert.assertEquals(0, queue.size()); + + queue.put("a"); + Assert.assertEquals(1, queue.size()); + + queue.put("b"); + Assert.assertEquals(2, queue.size()); + + queue.take(); + Assert.assertEquals(1, queue.size()); + + queue.take(); + Assert.assertEquals(0, queue.size()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSemaphoreBasedLinkedCallQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSemaphoreBasedLinkedCallQueue.java new file mode 100644 index 0000000..d7d571f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSemaphoreBasedLinkedCallQueue.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestSemaphoreBasedLinkedCallQueue { + @Test(timeout=1000) + public void testFifo() throws Exception { + SemaphoreBasedLinkedCallQueue queue = + new SemaphoreBasedLinkedCallQueue(10); + + queue.put("a"); + queue.put("b"); + queue.put("c"); + queue.put("a"); + + Assert.assertEquals("a", queue.take()); + Assert.assertEquals("b", queue.take()); + Assert.assertEquals("c", queue.take()); + Assert.assertEquals("a", queue.take()); + + Assert.assertEquals(0, queue.size()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSemaphoreBasedPriorityCallQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSemaphoreBasedPriorityCallQueue.java new file mode 100644 index 0000000..ff01d3d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSemaphoreBasedPriorityCallQueue.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import java.util.Comparator; + +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestSemaphoreBasedPriorityCallQueue { + @Test(timeout=1000) + public void testNaturalOrdering() throws Exception { + SemaphoreBasedPriorityCallQueue queue = + new SemaphoreBasedPriorityCallQueue(10, null); + + queue.put("a"); + queue.put("b"); + queue.put("c"); + queue.put("a"); + + Assert.assertEquals("a", queue.take()); + Assert.assertEquals("a", queue.take()); + Assert.assertEquals("b", queue.take()); + Assert.assertEquals("c", queue.take()); + + Assert.assertEquals(0, queue.size()); + } + + @Test(timeout=1000) + public void testComaprator() throws Exception { + SemaphoreBasedPriorityCallQueue queue = + new SemaphoreBasedPriorityCallQueue(10, new Comparator() { + @Override + public int compare(String s1, String s2) { + return s2.compareTo(s1); + } + }); + + queue.put("a"); + queue.put("b"); + queue.put("c"); + queue.put("a"); + + Assert.assertEquals("c", queue.take()); + Assert.assertEquals("b", queue.take()); + Assert.assertEquals("a", queue.take()); + Assert.assertEquals("a", queue.take()); + + Assert.assertEquals(0, queue.size()); + } +}