diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java index 8dce6ba..50e42c6 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java @@ -201,13 +201,14 @@ public final class CellUtil { @Override public boolean advance() throws IOException { - if (this.cellScanner == null) { - if (!this.iterator.hasNext()) return false; - this.cellScanner = this.iterator.next().cellScanner(); + while (true) { + if (this.cellScanner == null) { + if (!this.iterator.hasNext()) return false; + this.cellScanner = this.iterator.next().cellScanner(); + } + if (this.cellScanner.advance()) return true; + this.cellScanner = null; } - if (this.cellScanner.advance()) return true; - this.cellScanner = null; - return advance(); } }; } @@ -275,11 +276,9 @@ public final class CellUtil { * inside Put, etc., keeping Cells organized by family. * @return CellScanner interface over cellIterable */ - public static CellScanner createCellScanner(final NavigableMap> map) { + public static CellScanner createCellScanner(final NavigableMap> map) { return new CellScanner() { - private final Iterator>> entries = - map.entrySet().iterator(); + private final Iterator>> entries = map.entrySet().iterator(); private Iterator currentIterator = null; private Cell currentCell; @@ -290,17 +289,18 @@ public final class CellUtil { @Override public boolean advance() { - if (this.currentIterator == null) { - if (!this.entries.hasNext()) return false; - this.currentIterator = this.entries.next().getValue().iterator(); + while(true) { + if (this.currentIterator == null) { + if (!this.entries.hasNext()) return false; + this.currentIterator = this.entries.next().getValue().iterator(); + } + if (this.currentIterator.hasNext()) { + this.currentCell = this.currentIterator.next(); + return true; + } + this.currentCell = null; + this.currentIterator = null; } - if (this.currentIterator.hasNext()) { - this.currentCell = this.currentIterator.next(); - return true; - } - this.currentCell = null; - this.currentIterator = null; - return advance(); } }; } diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestCellUtil.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestCellUtil.java index 4835cca..50063f4 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestCellUtil.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestCellUtil.java @@ -18,6 +18,12 @@ package org.apache.hadoop.hbase; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.NavigableMap; +import java.util.TreeMap; + import org.apache.hadoop.hbase.util.Bytes; import org.junit.Assert; import org.junit.Test; @@ -25,6 +31,250 @@ import org.junit.experimental.categories.Category; @Category(SmallTests.class) public class TestCellUtil { + /** + * CellScannable used in test. Returns a {@link TestCellScanner} + */ + private class TestCellScannable implements CellScannable { + private final int cellsCount; + TestCellScannable(final int cellsCount) { + this.cellsCount = cellsCount; + } + @Override + public CellScanner cellScanner() { + return new TestCellScanner(this.cellsCount); + } + }; + + /** + * CellScanner used in test. + */ + private class TestCellScanner implements CellScanner { + private int count = 0; + private Cell current = null; + private final int cellsCount; + + TestCellScanner(final int cellsCount) { + this.cellsCount = cellsCount; + } + + @Override + public Cell current() { + return this.current; + } + + @Override + public boolean advance() throws IOException { + if (this.count < cellsCount) { + this.current = new TestCell(this.count); + this.count++; + return true; + } + return false; + } + } + + /** + * Cell used in test. Has row only. + */ + private class TestCell implements Cell { + private final byte [] row; + + TestCell(final int i) { + this.row = Bytes.toBytes(i); + } + + @Override + public byte[] getRowArray() { + return this.row; + } + + @Override + public int getRowOffset() { + return 0; + } + + @Override + public short getRowLength() { + return (short)this.row.length; + } + + @Override + public byte[] getFamilyArray() { + // TODO Auto-generated method stub + return null; + } + + @Override + public int getFamilyOffset() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public byte getFamilyLength() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public byte[] getQualifierArray() { + // TODO Auto-generated method stub + return null; + } + + @Override + public int getQualifierOffset() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public int getQualifierLength() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public long getTimestamp() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public byte getTypeByte() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public long getMvccVersion() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public byte[] getValueArray() { + // TODO Auto-generated method stub + return null; + } + + @Override + public int getValueOffset() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public int getValueLength() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public byte[] getTagsArray() { + // TODO Auto-generated method stub + return null; + } + + @Override + public int getTagsOffset() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public byte[] getValue() { + // TODO Auto-generated method stub + return null; + } + + @Override + public byte[] getFamily() { + // TODO Auto-generated method stub + return null; + } + + @Override + public byte[] getQualifier() { + // TODO Auto-generated method stub + return null; + } + + @Override + public byte[] getRow() { + // TODO Auto-generated method stub + return null; + } + + @Override + public long getSequenceId() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public int getTagsLength() { + // TODO Auto-generated method stub + return 0; + } + }; + + /** + * Was overflowing if 100k or so lists of cellscanners to return. + * @throws IOException + */ + @Test + public void testCreateCellScannerOverflow() throws IOException { + consume(doCreateCellScanner(1, 1), 1 * 1); + consume(doCreateCellScanner(3, 0), 3 * 0); + consume(doCreateCellScanner(3, 3), 3 * 3); + consume(doCreateCellScanner(0, 1), 0 * 1); + // Do big number. See HBASE-11813 for why. + final int hundredK = 100000; + consume(doCreateCellScanner(hundredK, 0), hundredK * 0); + consume(doCreateCellArray(1), 1); + consume(doCreateCellArray(0), 0); + consume(doCreateCellArray(3), 3); + List cells = new ArrayList(hundredK); + for (int i = 0; i < hundredK; i++) { + cells.add(new TestCellScannable(1)); + } + consume(CellUtil.createCellScanner(cells), hundredK * 1); + NavigableMap> m = new TreeMap>(Bytes.BYTES_COMPARATOR); + List cellArray = new ArrayList(hundredK); + for (int i = 0; i < hundredK; i++) cellArray.add(new TestCell(i)); + m.put(new byte [] {'f'}, cellArray); + consume(CellUtil.createCellScanner(m), hundredK * 1); + } + + private CellScanner doCreateCellArray(final int itemsPerList) { + Cell [] cells = new Cell [itemsPerList]; + for (int i = 0; i < itemsPerList; i++) { + cells[i] = new TestCell(i); + } + return CellUtil.createCellScanner(cells); + } + + private CellScanner doCreateCellScanner(final int listsCount, final int itemsPerList) + throws IOException { + List cells = new ArrayList(listsCount); + for (int i = 0; i < listsCount; i++) { + CellScannable cs = new CellScannable() { + @Override + public CellScanner cellScanner() { + return new TestCellScanner(itemsPerList); + } + }; + cells.add(cs); + } + return CellUtil.createCellScanner(cells); + } + + private void consume(final CellScanner scanner, final int expected) throws IOException { + int count = 0; + while (scanner.advance()) count++; + Assert.assertEquals(expected, count); + } @Test public void testOverlappingKeys() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index c364786..05fd873 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -139,7 +139,7 @@ public class CallRunner { RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught: " + StringUtils.stringifyException(e)); } finally { - // regardless if succesful or not we need to reset the callQueueSize + // regardless if successful or not we need to reset the callQueueSize this.rpcServer.addCallSize(call.getSize() * -1); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 2debe2e..5b13b1d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -126,21 +126,21 @@ public class SimpleRpcScheduler extends RpcScheduler { // multiple read/write queues if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority); - callExecutor = new RWQueueRpcExecutor("default", handlerCount, numCallQueues, + callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues, callqReadShare, callqScanShare, maxQueueLength, BoundedPriorityBlockingQueue.class, callPriority); } else { - callExecutor = new RWQueueRpcExecutor("default", handlerCount, numCallQueues, + callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues, callqReadShare, callqScanShare, maxQueueLength); } } else { // multiple queues if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority); - callExecutor = new BalancedQueueRpcExecutor("default", handlerCount, numCallQueues, + callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority); } else { - callExecutor = new BalancedQueueRpcExecutor("default", handlerCount, + callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues, maxQueueLength); } }