From 797f06bfd97d0c29a0f624b9bf65b3bc5f522054 Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Mon, 5 Aug 2013 21:36:38 -0700 Subject: [PATCH] Revert scanner pre-fetching. --- .../apache/hadoop/hbase/client/ClientScanner.java | 44 +-- .../java/org/apache/hadoop/hbase/client/Scan.java | 20 - .../hadoop/hbase/client/ScannerCallable.java | 3 +- .../apache/hadoop/hbase/protobuf/ProtobufUtil.java | 10 - .../java/org/apache/hadoop/hbase/util/Threads.java | 41 +- .../hbase/protobuf/generated/ClientProtos.java | 217 +++-------- hbase-protocol/src/main/protobuf/Client.proto | 2 - .../apache/hadoop/hbase/regionserver/HRegion.java | 2 +- .../hadoop/hbase/regionserver/HRegionServer.java | 218 ++++++----- .../hbase/regionserver/RegionScannerHolder.java | 394 -------------------- .../hbase/client/TestScannersFromClientSide.java | 153 ++------ .../coprocessor/TestRowProcessorEndpoint.java | 4 +- .../hadoop/hbase/protobuf/TestProtobufUtil.java | 1 - .../regionserver/TestRegionServerMetrics.java | 1 - 14 files changed, 232 insertions(+), 878 deletions(-) delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerHolder.java diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index bcc0060..4fdc6ce 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -149,7 +149,7 @@ public class ClientScanner extends AbstractClientScanner { this.caller = rpcFactory. newCaller(); // initialize the scanner - nextScanner(false); + nextScanner(this.caching, false); } protected HConnection getConnection() { @@ -190,9 +190,10 @@ public class ClientScanner extends AbstractClientScanner { * scanner at the scan.getStartRow(). We will go no further, just tidy * up outstanding scanners, if currentRegion != null and * done is true. + * @param nbRows * @param done Server-side says we're done scanning. */ - private boolean nextScanner(final boolean done) + private boolean nextScanner(int nbRows, final boolean done) throws IOException { // Close the previous scanner if it's open if (this.callable != null) { @@ -231,7 +232,7 @@ public class ClientScanner extends AbstractClientScanner { Bytes.toStringBinary(localStartKey) + "'"); } try { - callable = getScannerCallable(localStartKey); + callable = getScannerCallable(localStartKey, nbRows); // Open a scanner on the region server starting at the // beginning of the region this.caller.callWithRetries(callable); @@ -246,11 +247,12 @@ public class ClientScanner extends AbstractClientScanner { return true; } - protected ScannerCallable getScannerCallable(byte [] localStartKey) { + protected ScannerCallable getScannerCallable(byte [] localStartKey, + int nbRows) { scan.setStartRow(localStartKey); ScannerCallable s = new ScannerCallable(getConnection(), getTableName(), scan, this.scanMetrics); - s.setCaching(this.caching); + s.setCaching(nbRows); return s; } @@ -284,13 +286,23 @@ public class ClientScanner extends AbstractClientScanner { Result [] values = null; long remainingResultSize = maxScannerResultSize; int countdown = this.caching; - + // We need to reset it if it's a new callable that was created + // with a countdown in nextScanner + callable.setCaching(this.caching); // This flag is set when we want to skip the result returned. We do // this when we reset scanner because it split under us. boolean skipFirst = false; boolean retryAfterOutOfOrderException = true; do { try { + if (skipFirst) { + // Skip only the first row (which was the last row of the last + // already-processed batch). + callable.setCaching(1); + values = this.caller.callWithRetries(callable); + callable.setCaching(this.caching); + skipFirst = false; + } // Server returns a null values if scanning is to stop. Else, // returns an empty array if scanning is to go on and we've just // exhausted current region. @@ -360,15 +372,7 @@ public class ClientScanner extends AbstractClientScanner { } lastNext = currentTime; if (values != null && values.length > 0) { - int i = 0; - if (skipFirst) { - skipFirst = false; - // We will cache one row less, which is fine - countdown--; - i = 1; - } - for (; i < values.length; i++) { - Result rs = values[i]; + for (Result rs : values) { cache.add(rs); for (KeyValue kv : rs.raw()) { remainingResultSize -= kv.heapSize(); @@ -378,7 +382,7 @@ public class ClientScanner extends AbstractClientScanner { } } // Values == null means server-side filter has determined we must STOP - } while (remainingResultSize > 0 && countdown > 0 && nextScanner(values == null)); + } while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown, values == null)); } if (cache.size() > 0) { @@ -431,12 +435,4 @@ public class ClientScanner extends AbstractClientScanner { } closed = true; } - - long currentScannerId() { - return (callable == null) ? -1L : callable.scannerId; - } - - HRegionInfo currentRegionInfo() { - return currentRegion; - } } diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index de2e0cc..af45616 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -116,8 +116,6 @@ public class Scan extends OperationWithAttributes { new TreeMap>(Bytes.BYTES_COMPARATOR); private Boolean loadColumnFamiliesOnDemand = null; - private boolean prefetching = true; - /** * Create a Scan operation across all rows. */ @@ -170,7 +168,6 @@ public class Scan extends OperationWithAttributes { getScan = scan.isGetScan(); filter = scan.getFilter(); // clone? loadColumnFamiliesOnDemand = scan.getLoadColumnFamiliesOnDemandValue(); - prefetching = scan.getPrefetching(); TimeRange ctr = scan.getTimeRange(); tr = new TimeRange(ctr.getMin(), ctr.getMax()); Map> fams = scan.getFamilyMap(); @@ -204,7 +201,6 @@ public class Scan extends OperationWithAttributes { this.storeOffset = get.getRowOffsetPerColumnFamily(); this.tr = get.getTimeRange(); this.familyMap = get.getFamilyMap(); - this.prefetching = false; this.getScan = true; } @@ -368,21 +364,6 @@ public class Scan extends OperationWithAttributes { } /** - * Set if pre-fetching is enabled. If enabled, the region - * server will try to read the next scan result ahead of time. This - * improves scan performance if we are doing large scans. - * - * @param enablePrefetching if pre-fetching is enabled or not - */ - public void setPrefetching(boolean enablePrefetching) { - this.prefetching = enablePrefetching; - } - - public boolean getPrefetching() { - return prefetching; - } - -/** * @return the maximum result size in bytes. See {@link #setMaxResultSize(long)} */ public long getMaxResultSize() { @@ -632,7 +613,6 @@ public class Scan extends OperationWithAttributes { map.put("maxResultSize", this.maxResultSize); map.put("cacheBlocks", this.cacheBlocks); map.put("loadColumnFamiliesOnDemand", this.loadColumnFamiliesOnDemand); - map.put("prefetching", this.prefetching); List timeRange = new ArrayList(); timeRange.add(this.tr.getMin()); timeRange.add(this.tr.getMax()); diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 42e655c..c701e37 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -61,7 +61,7 @@ public class ScannerCallable extends RegionServerCallable { public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity"; public static final Log LOG = LogFactory.getLog(ScannerCallable.class); - long scannerId = -1L; + private long scannerId = -1L; private boolean instantiated = false; private boolean closed = false; private Scan scan; @@ -137,7 +137,6 @@ public class ScannerCallable extends RegionServerCallable { /** * @see java.util.concurrent.Callable#call() */ - @SuppressWarnings("deprecation") public Result [] call() throws IOException { if (closed) { if (scannerId != -1) { diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 1a64f1f..3c6ab5d 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -713,9 +713,6 @@ public final class ProtobufUtil { if (scan.getBatch() > 0) { scanBuilder.setBatchSize(scan.getBatch()); } - if (scan.getCaching() > 0) { - scanBuilder.setCachingCount(scan.getCaching()); - } if (scan.getMaxResultSize() > 0) { scanBuilder.setMaxResultSize(scan.getMaxResultSize()); } @@ -723,7 +720,6 @@ public final class ProtobufUtil { if (loadColumnFamiliesOnDemand != null) { scanBuilder.setLoadColumnFamiliesOnDemand(loadColumnFamiliesOnDemand.booleanValue()); } - scanBuilder.setPrefetching(scan.getPrefetching()); scanBuilder.setMaxVersions(scan.getMaxVersions()); TimeRange timeRange = scan.getTimeRange(); if (!timeRange.isAllTime()) { @@ -801,9 +797,6 @@ public final class ProtobufUtil { if (proto.hasMaxVersions()) { scan.setMaxVersions(proto.getMaxVersions()); } - if (proto.hasPrefetching()) { - scan.setPrefetching(proto.getPrefetching()); - } if (proto.hasStoreLimit()) { scan.setMaxResultsPerColumnFamily(proto.getStoreLimit()); } @@ -832,9 +825,6 @@ public final class ProtobufUtil { if (proto.hasBatchSize()) { scan.setBatch(proto.getBatchSize()); } - if (proto.hasCachingCount()) { - scan.setCaching(proto.getCachingCount()); - } if (proto.hasMaxResultSize()) { scan.setMaxResultSize(proto.getMaxResultSize()); } diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java index 7ace6b7..46a9292 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java @@ -22,9 +22,6 @@ import java.io.InterruptedIOException; import java.io.PrintWriter; import java.lang.Thread.UncaughtExceptionHandler; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -186,42 +183,8 @@ public class Threads { boundedCachedThreadPool.allowCoreThreadTimeOut(true); return boundedCachedThreadPool; } - - /** - * Creates a ThreadPoolExecutor which has a bound on the number of tasks that can be - * submitted to it, determined by the blockingLimit parameter. Excess tasks - * submitted will block on the calling thread till space frees up. - * - * @param blockingLimit max number of tasks that can be submitted - * @param timeout time value after which unused threads are killed - * @param unit time unit for killing unused threads - * @param threadFactory thread factory to use to spawn threads - * @return the ThreadPoolExecutor - */ - public static ThreadPoolExecutor getBlockingThreadPool( - int blockingLimit, long timeout, TimeUnit unit, - ThreadFactory threadFactory) { - ThreadPoolExecutor blockingThreadPool = - new ThreadPoolExecutor( - 1, blockingLimit, timeout, TimeUnit.SECONDS, - new SynchronousQueue(), - threadFactory, - new RejectedExecutionHandler() { - @Override - public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { - try { - // The submitting thread will block until the thread pool frees up. - executor.getQueue().put(r); - } catch (InterruptedException e) { - throw new RejectedExecutionException( - "Failed to requeue the rejected request because of ", e); - } - } - }); - blockingThreadPool.allowCoreThreadTimeOut(true); - return blockingThreadPool; - } - + + /** * Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely, * with a common prefix. diff --git hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java index 6d8dce7..a5acc3c 100644 --- hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java +++ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java @@ -10625,14 +10625,6 @@ public final class ClientProtos { // optional bool load_column_families_on_demand = 13; boolean hasLoadColumnFamiliesOnDemand(); boolean getLoadColumnFamiliesOnDemand(); - - // optional uint32 caching_count = 14; - boolean hasCachingCount(); - int getCachingCount(); - - // optional bool prefetching = 15; - boolean hasPrefetching(); - boolean getPrefetching(); } public static final class Scan extends com.google.protobuf.GeneratedMessage @@ -10821,26 +10813,6 @@ public final class ClientProtos { return loadColumnFamiliesOnDemand_; } - // optional uint32 caching_count = 14; - public static final int CACHING_COUNT_FIELD_NUMBER = 14; - private int cachingCount_; - public boolean hasCachingCount() { - return ((bitField0_ & 0x00000800) == 0x00000800); - } - public int getCachingCount() { - return cachingCount_; - } - - // optional bool prefetching = 15; - public static final int PREFETCHING_FIELD_NUMBER = 15; - private boolean prefetching_; - public boolean hasPrefetching() { - return ((bitField0_ & 0x00001000) == 0x00001000); - } - public boolean getPrefetching() { - return prefetching_; - } - private void initFields() { column_ = java.util.Collections.emptyList(); attribute_ = java.util.Collections.emptyList(); @@ -10855,8 +10827,6 @@ public final class ClientProtos { storeLimit_ = 0; storeOffset_ = 0; loadColumnFamiliesOnDemand_ = false; - cachingCount_ = 0; - prefetching_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -10927,12 +10897,6 @@ public final class ClientProtos { if (((bitField0_ & 0x00000400) == 0x00000400)) { output.writeBool(13, loadColumnFamiliesOnDemand_); } - if (((bitField0_ & 0x00000800) == 0x00000800)) { - output.writeUInt32(14, cachingCount_); - } - if (((bitField0_ & 0x00001000) == 0x00001000)) { - output.writeBool(15, prefetching_); - } getUnknownFields().writeTo(output); } @@ -10994,14 +10958,6 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeBoolSize(13, loadColumnFamiliesOnDemand_); } - if (((bitField0_ & 0x00000800) == 0x00000800)) { - size += com.google.protobuf.CodedOutputStream - .computeUInt32Size(14, cachingCount_); - } - if (((bitField0_ & 0x00001000) == 0x00001000)) { - size += com.google.protobuf.CodedOutputStream - .computeBoolSize(15, prefetching_); - } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -11084,16 +11040,6 @@ public final class ClientProtos { result = result && (getLoadColumnFamiliesOnDemand() == other.getLoadColumnFamiliesOnDemand()); } - result = result && (hasCachingCount() == other.hasCachingCount()); - if (hasCachingCount()) { - result = result && (getCachingCount() - == other.getCachingCount()); - } - result = result && (hasPrefetching() == other.hasPrefetching()); - if (hasPrefetching()) { - result = result && (getPrefetching() - == other.getPrefetching()); - } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -11155,14 +11101,6 @@ public final class ClientProtos { hash = (37 * hash) + LOAD_COLUMN_FAMILIES_ON_DEMAND_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getLoadColumnFamiliesOnDemand()); } - if (hasCachingCount()) { - hash = (37 * hash) + CACHING_COUNT_FIELD_NUMBER; - hash = (53 * hash) + getCachingCount(); - } - if (hasPrefetching()) { - hash = (37 * hash) + PREFETCHING_FIELD_NUMBER; - hash = (53 * hash) + hashBoolean(getPrefetching()); - } hash = (29 * hash) + getUnknownFields().hashCode(); return hash; } @@ -11325,10 +11263,6 @@ public final class ClientProtos { bitField0_ = (bitField0_ & ~0x00000800); loadColumnFamiliesOnDemand_ = false; bitField0_ = (bitField0_ & ~0x00001000); - cachingCount_ = 0; - bitField0_ = (bitField0_ & ~0x00002000); - prefetching_ = false; - bitField0_ = (bitField0_ & ~0x00004000); return this; } @@ -11437,14 +11371,6 @@ public final class ClientProtos { to_bitField0_ |= 0x00000400; } result.loadColumnFamiliesOnDemand_ = loadColumnFamiliesOnDemand_; - if (((from_bitField0_ & 0x00002000) == 0x00002000)) { - to_bitField0_ |= 0x00000800; - } - result.cachingCount_ = cachingCount_; - if (((from_bitField0_ & 0x00004000) == 0x00004000)) { - to_bitField0_ |= 0x00001000; - } - result.prefetching_ = prefetching_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -11546,12 +11472,6 @@ public final class ClientProtos { if (other.hasLoadColumnFamiliesOnDemand()) { setLoadColumnFamiliesOnDemand(other.getLoadColumnFamiliesOnDemand()); } - if (other.hasCachingCount()) { - setCachingCount(other.getCachingCount()); - } - if (other.hasPrefetching()) { - setPrefetching(other.getPrefetching()); - } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -11676,16 +11596,6 @@ public final class ClientProtos { loadColumnFamiliesOnDemand_ = input.readBool(); break; } - case 112: { - bitField0_ |= 0x00002000; - cachingCount_ = input.readUInt32(); - break; - } - case 120: { - bitField0_ |= 0x00004000; - prefetching_ = input.readBool(); - break; - } } } } @@ -12439,48 +12349,6 @@ public final class ClientProtos { return this; } - // optional uint32 caching_count = 14; - private int cachingCount_ ; - public boolean hasCachingCount() { - return ((bitField0_ & 0x00002000) == 0x00002000); - } - public int getCachingCount() { - return cachingCount_; - } - public Builder setCachingCount(int value) { - bitField0_ |= 0x00002000; - cachingCount_ = value; - onChanged(); - return this; - } - public Builder clearCachingCount() { - bitField0_ = (bitField0_ & ~0x00002000); - cachingCount_ = 0; - onChanged(); - return this; - } - - // optional bool prefetching = 15; - private boolean prefetching_ ; - public boolean hasPrefetching() { - return ((bitField0_ & 0x00004000) == 0x00004000); - } - public boolean getPrefetching() { - return prefetching_; - } - public Builder setPrefetching(boolean value) { - bitField0_ |= 0x00004000; - prefetching_ = value; - onChanged(); - return this; - } - public Builder clearPrefetching() { - bitField0_ = (bitField0_ & ~0x00004000); - prefetching_ = false; - onChanged(); - return this; - } - // @@protoc_insertion_point(builder_scope:Scan) } @@ -21603,7 +21471,7 @@ public final class ClientProtos { "gion\030\001 \002(\0132\020.RegionSpecifier\022 \n\010mutation" + "\030\002 \002(\0132\016.MutationProto\022\035\n\tcondition\030\003 \001(" + "\0132\n.Condition\"<\n\016MutateResponse\022\027\n\006resul" + - "t\030\001 \001(\0132\007.Result\022\021\n\tprocessed\030\002 \001(\010\"\201\003\n\004" + + "t\030\001 \001(\0132\007.Result\022\021\n\tprocessed\030\002 \001(\010\"\325\002\n\004" + "Scan\022\027\n\006column\030\001 \003(\0132\007.Column\022!\n\tattribu" + "te\030\002 \003(\0132\016.NameBytesPair\022\021\n\tstart_row\030\003 " + "\001(\014\022\020\n\010stop_row\030\004 \001(\014\022\027\n\006filter\030\005 \001(\0132\007.", @@ -21612,47 +21480,46 @@ public final class ClientProtos { "\010 \001(\010:\004true\022\022\n\nbatch_size\030\t \001(\r\022\027\n\017max_r" + "esult_size\030\n \001(\004\022\023\n\013store_limit\030\013 \001(\r\022\024\n" + "\014store_offset\030\014 \001(\r\022&\n\036load_column_famil" + - "ies_on_demand\030\r \001(\010\022\025\n\rcaching_count\030\016 \001" + - "(\r\022\023\n\013prefetching\030\017 \001(\010\"\236\001\n\013ScanRequest\022" + - " \n\006region\030\001 \001(\0132\020.RegionSpecifier\022\023\n\004sca" + - "n\030\002 \001(\0132\005.Scan\022\022\n\nscanner_id\030\003 \001(\004\022\026\n\016nu" + - "mber_of_rows\030\004 \001(\r\022\025\n\rclose_scanner\030\005 \001(", - "\010\022\025\n\rnext_call_seq\030\006 \001(\004\"p\n\014ScanResponse" + - "\022)\n\020result_cell_meta\030\001 \001(\0132\017.ResultCellM" + - "eta\022\022\n\nscanner_id\030\002 \001(\004\022\024\n\014more_results\030" + - "\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\"&\n\016ResultCellMeta\022\024\n\014" + - "cells_length\030\001 \003(\r\"\263\001\n\024BulkLoadHFileRequ" + - "est\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\0225\n" + - "\013family_path\030\002 \003(\0132 .BulkLoadHFileReques" + - "t.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\032*\n\n" + - "FamilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t" + - "\"\'\n\025BulkLoadHFileResponse\022\016\n\006loaded\030\001 \002(", - "\010\"a\n\026CoprocessorServiceCall\022\013\n\003row\030\001 \002(\014" + - "\022\024\n\014service_name\030\002 \002(\t\022\023\n\013method_name\030\003 " + - "\002(\t\022\017\n\007request\030\004 \002(\014\"d\n\031CoprocessorServi" + - "ceRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpecif" + - "ier\022%\n\004call\030\002 \002(\0132\027.CoprocessorServiceCa" + - "ll\"]\n\032CoprocessorServiceResponse\022 \n\006regi" + - "on\030\001 \002(\0132\020.RegionSpecifier\022\035\n\005value\030\002 \002(" + - "\0132\016.NameBytesPair\"B\n\013MultiAction\022 \n\010muta" + - "tion\030\001 \001(\0132\016.MutationProto\022\021\n\003get\030\002 \001(\0132" + - "\004.Get\"I\n\014ActionResult\022\026\n\005value\030\001 \001(\0132\007.R", - "esult\022!\n\texception\030\002 \001(\0132\016.NameBytesPair" + - "\"^\n\014MultiRequest\022 \n\006region\030\001 \002(\0132\020.Regio" + - "nSpecifier\022\034\n\006action\030\002 \003(\0132\014.MultiAction" + - "\022\016\n\006atomic\030\003 \001(\010\".\n\rMultiResponse\022\035\n\006res" + - "ult\030\001 \003(\0132\r.ActionResult2\342\002\n\rClientServi" + - "ce\022 \n\003Get\022\013.GetRequest\032\014.GetResponse\022/\n\010" + - "MultiGet\022\020.MultiGetRequest\032\021.MultiGetRes" + - "ponse\022)\n\006Mutate\022\016.MutateRequest\032\017.Mutate" + - "Response\022#\n\004Scan\022\014.ScanRequest\032\r.ScanRes" + - "ponse\022>\n\rBulkLoadHFile\022\025.BulkLoadHFileRe", - "quest\032\026.BulkLoadHFileResponse\022F\n\013ExecSer" + - "vice\022\032.CoprocessorServiceRequest\032\033.Copro" + - "cessorServiceResponse\022&\n\005Multi\022\r.MultiRe" + - "quest\032\016.MultiResponseBB\n*org.apache.hado" + - "op.hbase.protobuf.generatedB\014ClientProto" + - "sH\001\210\001\001\240\001\001" + "ies_on_demand\030\r \001(\010\"\236\001\n\013ScanRequest\022 \n\006r" + + "egion\030\001 \001(\0132\020.RegionSpecifier\022\023\n\004scan\030\002 " + + "\001(\0132\005.Scan\022\022\n\nscanner_id\030\003 \001(\004\022\026\n\016number" + + "_of_rows\030\004 \001(\r\022\025\n\rclose_scanner\030\005 \001(\010\022\025\n" + + "\rnext_call_seq\030\006 \001(\004\"p\n\014ScanResponse\022)\n\020", + "result_cell_meta\030\001 \001(\0132\017.ResultCellMeta\022" + + "\022\n\nscanner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(" + + "\010\022\013\n\003ttl\030\004 \001(\r\"&\n\016ResultCellMeta\022\024\n\014cell" + + "s_length\030\001 \003(\r\"\263\001\n\024BulkLoadHFileRequest\022" + + " \n\006region\030\001 \002(\0132\020.RegionSpecifier\0225\n\013fam" + + "ily_path\030\002 \003(\0132 .BulkLoadHFileRequest.Fa" + + "milyPath\022\026\n\016assign_seq_num\030\003 \001(\010\032*\n\nFami" + + "lyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025" + + "BulkLoadHFileResponse\022\016\n\006loaded\030\001 \002(\010\"a\n" + + "\026CoprocessorServiceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014", + "service_name\030\002 \002(\t\022\023\n\013method_name\030\003 \002(\t\022" + + "\017\n\007request\030\004 \002(\014\"d\n\031CoprocessorServiceRe" + + "quest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022" + + "%\n\004call\030\002 \002(\0132\027.CoprocessorServiceCall\"]" + + "\n\032CoprocessorServiceResponse\022 \n\006region\030\001" + + " \002(\0132\020.RegionSpecifier\022\035\n\005value\030\002 \002(\0132\016." + + "NameBytesPair\"B\n\013MultiAction\022 \n\010mutation" + + "\030\001 \001(\0132\016.MutationProto\022\021\n\003get\030\002 \001(\0132\004.Ge" + + "t\"I\n\014ActionResult\022\026\n\005value\030\001 \001(\0132\007.Resul" + + "t\022!\n\texception\030\002 \001(\0132\016.NameBytesPair\"^\n\014", + "MultiRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpe" + + "cifier\022\034\n\006action\030\002 \003(\0132\014.MultiAction\022\016\n\006" + + "atomic\030\003 \001(\010\".\n\rMultiResponse\022\035\n\006result\030" + + "\001 \003(\0132\r.ActionResult2\342\002\n\rClientService\022 " + + "\n\003Get\022\013.GetRequest\032\014.GetResponse\022/\n\010Mult" + + "iGet\022\020.MultiGetRequest\032\021.MultiGetRespons" + + "e\022)\n\006Mutate\022\016.MutateRequest\032\017.MutateResp" + + "onse\022#\n\004Scan\022\014.ScanRequest\032\r.ScanRespons" + + "e\022>\n\rBulkLoadHFile\022\025.BulkLoadHFileReques" + + "t\032\026.BulkLoadHFileResponse\022F\n\013ExecService", + "\022\032.CoprocessorServiceRequest\032\033.Coprocess" + + "orServiceResponse\022&\n\005Multi\022\r.MultiReques" + + "t\032\016.MultiResponseBB\n*org.apache.hadoop.h" + + "base.protobuf.generatedB\014ClientProtosH\001\210" + + "\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -21768,7 +21635,7 @@ public final class ClientProtos { internal_static_Scan_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_Scan_descriptor, - new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "CachingCount", "Prefetching", }, + new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", }, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.class, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.Builder.class); internal_static_ScanRequest_descriptor = diff --git hbase-protocol/src/main/protobuf/Client.proto hbase-protocol/src/main/protobuf/Client.proto index 2d0892f..ba8708f 100644 --- hbase-protocol/src/main/protobuf/Client.proto +++ hbase-protocol/src/main/protobuf/Client.proto @@ -236,8 +236,6 @@ message Scan { optional uint32 store_limit = 11; optional uint32 store_offset = 12; optional bool load_column_families_on_demand = 13; /* DO NOT add defaults to load_column_families_on_demand. */ - optional uint32 caching_count = 14; - optional bool prefetching = 15; } /** diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index cadcae7..9939f6a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -183,7 +183,6 @@ import com.google.protobuf.Service; * defines the keyspace for this HRegion. */ @InterfaceAudience.Private -@SuppressWarnings("deprecation") public class HRegion implements HeapSize { // , Writable{ public static final Log LOG = LogFactory.getLog(HRegion.class); @@ -3543,6 +3542,7 @@ public class HRegion implements HeapSize { // , Writable{ return returnResult; } + private void populateFromJoinedHeap(List results, int limit) throws IOException { assert joinedContinuationRow != null; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 3ddbfe8..3c44147 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -45,8 +45,6 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.management.ObjectName; @@ -61,7 +59,6 @@ import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.Chore; -import org.apache.hadoop.hbase.DaemonThreadFactory; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants.OperationStatusCode; @@ -71,6 +68,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HealthCheckChore; import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Stoppable; @@ -162,8 +160,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultCellMeta; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos; @@ -478,11 +476,6 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa private TableLockManager tableLockManager; /** - * Threadpool for doing scanner prefetches - */ - protected ThreadPoolExecutor scanPrefetchThreadPool; - - /** * Starts a HRegionServer at the default location * * @param conf @@ -632,18 +625,14 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa } RegionScanner getScanner(long scannerId) { - RegionScannerHolder scannerHolder = getScannerHolder(scannerId); + String scannerIdString = Long.toString(scannerId); + RegionScannerHolder scannerHolder = scanners.get(scannerIdString); if (scannerHolder != null) { - return scannerHolder.scanner; + return scannerHolder.s; } return null; } - public RegionScannerHolder getScannerHolder(long scannerId) { - String scannerIdString = Long.toString(scannerId); - return scanners.get(scannerIdString); - } - /** * All initialization needed before we go register with Master. * @@ -860,11 +849,6 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa if (this.thriftServer != null) this.thriftServer.shutdown(); this.leases.closeAfterLeasesExpire(); this.rpcServer.stop(); - - if (scanPrefetchThreadPool != null) { - // shutdown the prefetch threads - scanPrefetchThreadPool.shutdownNow(); - } if (this.splitLogWorker != null) { splitLogWorker.stop(); } @@ -1134,7 +1118,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa // exception next time they come in. for (Map.Entry e : this.scanners.entrySet()) { try { - e.getValue().closeScanner(); + e.getValue().s.close(); } catch (IOException ioe) { LOG.warn("Closing scanner " + e.getKey(), ioe); } @@ -1559,14 +1543,6 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa this.replicationSinkHandler.startReplicationService(); } - // start the scanner prefetch threadpool - int numHandlers = conf.getInt("hbase.regionserver.prefetcher.threads.max", - conf.getInt("hbase.regionserver.handler.count", 10) - + conf.getInt("hbase.regionserver.metahandler.count", 10)); - scanPrefetchThreadPool = - Threads.getBlockingThreadPool(numHandlers, 60, TimeUnit.SECONDS, - new DaemonThreadFactory(RegionScannerHolder.PREFETCHER_THREAD_PREFIX)); - // Start Server. This service is like leases in that it internally runs // a thread. this.rpcServer.start(); @@ -2372,7 +2348,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa public void leaseExpired() { RegionScannerHolder rsh = scanners.remove(this.scannerName); if (rsh != null) { - RegionScanner s = rsh.scanner; + RegionScanner s = rsh.s; LOG.info("Scanner " + this.scannerName + " lease expired on region " + s.getRegionInfo().getRegionNameAsString()); try { @@ -2381,7 +2357,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa region.getCoprocessorHost().preScannerClose(s); } - rsh.closeScanner(); + s.close(); if (region != null && region.getCoprocessorHost() != null) { region.getCoprocessorHost().postScannerClose(s); } @@ -2686,22 +2662,20 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa return this.fsOk; } - protected RegionScannerHolder addScanner( - RegionScanner s, HRegion r) throws LeaseStillHeldException { - RegionScannerHolder holder = new RegionScannerHolder(this, s, r); - String scannerName = null; + protected long addScanner(RegionScanner s) throws LeaseStillHeldException { long scannerId = -1; while (true) { - scannerId = nextLong(); - scannerName = String.valueOf(scannerId); - RegionScannerHolder existing = scanners.putIfAbsent(scannerName, holder); + scannerId = rand.nextLong(); + if (scannerId == -1) continue; + String scannerName = String.valueOf(scannerId); + RegionScannerHolder existing = scanners.putIfAbsent(scannerName, new RegionScannerHolder(s)); if (existing == null) { - holder.scannerName = scannerName; this.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod, - new ScannerListener(scannerName)); - return holder; + new ScannerListener(scannerName)); + break; } } + return scannerId; } /** @@ -2963,6 +2937,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa @Override public ScanResponse scan(final RpcController controller, final ScanRequest request) throws ServiceException { + Leases.Lease lease = null; String scannerName = null; try { if (!request.hasScannerId() && !request.hasScan()) { @@ -3013,10 +2988,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa throw new UnknownScannerException( "Name: " + scannerName + ", already closed?"); } - scanner = rsh.scanner; - // Use the region found in the online region list, - // not that one in the RegionScannerHolder. So that we can - // make sure the region is still open in this region server. + scanner = rsh.s; region = getRegion(scanner.getRegionInfo().getRegionName()); } else { region = getRegion(request.getRegion()); @@ -3027,6 +2999,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa if (!isLoadingCfsOnDemandSet) { scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); } + byte[] hasMetrics = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE); region.prepareScanner(scan); if (region.getCoprocessorHost() != null) { scanner = region.getCoprocessorHost().preScannerOpen(scan); @@ -3037,14 +3010,9 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa if (region.getCoprocessorHost() != null) { scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner); } - rsh = addScanner(scanner, region); - scannerName = rsh.scannerName; - scannerId = Long.parseLong(scannerName); - + scannerId = addScanner(scanner); + scannerName = String.valueOf(scannerId); ttl = this.scannerLeaseTimeoutPeriod; - if (scan.getPrefetching()) { - rsh.enablePrefetching(scan.getCaching()); - } } if (rows > 0) { @@ -3052,34 +3020,110 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa // performed even before checking of Lease. // See HBASE-5974 if (request.hasNextCallSeq()) { - if (request.getNextCallSeq() != rsh.nextCallSeq) { - throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.nextCallSeq - + " But the nextCallSeq got from client: " + request.getNextCallSeq() + - "; request=" + TextFormat.shortDebugString(request)); + if (rsh == null) { + rsh = scanners.get(scannerName); + } + if (rsh != null) { + if (request.getNextCallSeq() != rsh.nextCallSeq) { + throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.nextCallSeq + + " But the nextCallSeq got from client: " + request.getNextCallSeq() + + "; request=" + TextFormat.shortDebugString(request)); + } + // Increment the nextCallSeq value which is the next expected from client. + rsh.nextCallSeq++; } - // Increment the nextCallSeq value which is the next expected from client. - rsh.nextCallSeq++; } + try { + // Remove lease while its being processed in server; protects against case + // where processing of request takes > lease expiration time. + lease = leases.removeLease(scannerName); + List results = new ArrayList(rows); + long currentScanResultSize = 0; + + boolean done = false; + // Call coprocessor. Get region info from scanner. + if (region != null && region.getCoprocessorHost() != null) { + Boolean bypass = region.getCoprocessorHost().preScannerNext( + scanner, results, rows); + if (!results.isEmpty()) { + for (Result r : results) { + if (maxScannerResultSize < Long.MAX_VALUE){ + for (KeyValue kv : r.raw()) { + currentScanResultSize += kv.heapSize(); + } + } + } + } + if (bypass != null && bypass.booleanValue()) { + done = true; + } + } - ttl = this.scannerLeaseTimeoutPeriod; - ScanResult result = rsh.getScanResult(rows); - if (result.isException) { - throw result.ioException; - } + if (!done) { + long maxResultSize = scanner.getMaxResultSize(); + if (maxResultSize <= 0) { + maxResultSize = maxScannerResultSize; + } + List values = new ArrayList(); + MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint()); + region.startRegionOperation(Operation.SCAN); + try { + int i = 0; + synchronized(scanner) { + for (; i < rows + && currentScanResultSize < maxResultSize; i++) { + // Collect values to be returned here + boolean moreRows = scanner.nextRaw(values); + if (!values.isEmpty()) { + if (maxScannerResultSize < Long.MAX_VALUE){ + for (KeyValue kv : values) { + currentScanResultSize += kv.heapSize(); + } + } + results.add(new Result(values)); + } + if (!moreRows) { + break; + } + values.clear(); + } + } + region.readRequestsCount.add(i); + } finally { + region.closeRegionOperation(); + } - moreResults = result.moreResults; - if (result.results != null) { - List cellScannables = - new ArrayList(result.results.size()); - ResultCellMeta.Builder rcmBuilder = ResultCellMeta.newBuilder(); - for (Result res : result.results) { - cellScannables.add(res); - rcmBuilder.addCellsLength(res.size()); + // coprocessor postNext hook + if (region != null && region.getCoprocessorHost() != null) { + region.getCoprocessorHost().postScannerNext(scanner, results, rows, true); + } + } + + // If the scanner's filter - if any - is done with the scan + // and wants to tell the client to stop the scan. This is done by passing + // a null result, and setting moreResults to false. + if (scanner.isFilterDone() && results.isEmpty()) { + moreResults = false; + results = null; + } else { + ResultCellMeta.Builder rcmBuilder = ResultCellMeta.newBuilder(); + List cellScannables = new ArrayList(results.size()); + for (Result res : results) { + cellScannables.add(res); + rcmBuilder.addCellsLength(res.size()); + } + builder.setResultCellMeta(rcmBuilder.build()); + // TODO is this okey to assume the type and cast + ((PayloadCarryingRpcController) controller).setCellScanner(CellUtil + .createCellScanner(cellScannables)); + } + } finally { + // We're done. On way out re-add the above removed lease. + // Adding resets expiration time on lease. + if (scanners.containsKey(scannerName)) { + if (lease != null) leases.addLease(lease); + ttl = this.scannerLeaseTimeoutPeriod; } - builder.setResultCellMeta(rcmBuilder.build()); - // TODO is this okey to assume the type and cast - ((PayloadCarryingRpcController) controller).setCellScanner(CellUtil - .createCellScanner(cellScannables)); } } @@ -3093,13 +3137,9 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa } rsh = scanners.remove(scannerName); if (rsh != null) { - rsh.closeScanner(); - try { - leases.cancelLease(scannerName); - } catch (LeaseException le) { - // That's ok, since the lease may be gone with - // the prefetcher when cancelled. - } + scanner = rsh.s; + scanner.close(); + leases.cancelLease(scannerName); if (region != null && region.getCoprocessorHost() != null) { region.getCoprocessorHost().postScannerClose(scanner); } @@ -4182,6 +4222,18 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString()); } + /** + * Holder class which holds the RegionScanner and nextCallSeq together. + */ + private static class RegionScannerHolder { + private RegionScanner s; + private long nextCallSeq = 0L; + + public RegionScannerHolder(RegionScanner s) { + this.s = s; + } + } + private boolean isHealthCheckerConfigured() { String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC); return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerHolder.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerHolder.java deleted file mode 100644 index 792901d..0000000 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerHolder.java +++ /dev/null @@ -1,394 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; - -import com.google.common.base.Preconditions; - -/** - * Holder class which holds the RegionScanner, nextCallSeq, ScanPrefetcher - * and information needed for prefetcher/fetcher. - * - * Originally, this is an inner class of HRegionServer. We moved it out - * since HRegionServer is getting bigger and bigger. - */ -@InterfaceAudience.Private -public class RegionScannerHolder { - public final static String MAX_PREFETCHED_RESULT_SIZE_KEY - = "hbase.hregionserver.prefetcher.resultsize.max"; - public final static int MAX_PREFETCHED_RESULT_SIZE_DEFAULT = 256 * 1024 * 1024; - - final static Log LOG = LogFactory.getLog(RegionScannerHolder.class); - final static String PREFETCHER_THREAD_PREFIX = "scan-prefetch-"; - - private final static AtomicLong globalPrefetchedResultSize = new AtomicLong(); - - private ThreadPoolExecutor scanPrefetchThreadPool; - private Map scanners; - private long maxScannerResultSize; - private Configuration conf; - private Leases leases; - - private boolean prefetching = false; - private long maxGlobalPrefetchedResultSize; - private volatile Future prefetchScanFuture; - private volatile long prefetchedResultSize; - private ScanPrefetcher prefetcher; - private HRegion region; - private int rows; - - RegionScanner scanner; - long nextCallSeq = 0L; - String scannerName; - - /** - * Get the total size of all prefetched results not retrieved yet. - */ - public static long getPrefetchedResultSize() { - return globalPrefetchedResultSize.get(); - } - - /** - * Construct a RegionScanner holder for a specific region server. - * - * @param rs the region server the specific region is on - * @param s the scanner to be held - * @param r the region the scanner is for - */ - RegionScannerHolder(HRegionServer rs, RegionScanner s, HRegion r) { - scanPrefetchThreadPool = rs.scanPrefetchThreadPool; - maxScannerResultSize = rs.maxScannerResultSize; - prefetcher = new ScanPrefetcher(); - scanners = rs.scanners; - leases = rs.leases; - conf = rs.conf; - scanner = s; - region = r; - } - - public boolean isPrefetchSubmitted() { - return prefetchScanFuture != null; - } - - public HRegionInfo getRegionInfo() { - return region.getRegionInfo(); - } - - /** - * Find the current prefetched result size - */ - public long currentPrefetchedResultSize() { - return prefetchedResultSize; - } - - /** - * Wait till current prefetching task complete, - * return true if any data retrieved, false otherwise. - * Used for unit testing only. - */ - public boolean waitForPrefetchingDone() { - if (prefetchScanFuture != null) { - try { - ScanResult scanResult = prefetchScanFuture.get(); - return scanResult != null && scanResult.results != null - && !scanResult.results.isEmpty(); - } catch (Throwable t) { - LOG.debug("Got exception in getting scan result", t); - if (t instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - } - } - return false; - } - - /** - * Stop any prefetching task and close the scanner. - * @throws IOException - */ - public void closeScanner() throws IOException { - // stop prefetcher if needed. - if (prefetchScanFuture != null) { - synchronized (prefetcher) { - prefetcher.scannerClosing = true; - prefetchScanFuture.cancel(false); - } - prefetchScanFuture = null; - if (prefetchedResultSize > 0) { - globalPrefetchedResultSize.addAndGet(-prefetchedResultSize); - prefetchedResultSize = 0L; - } - } - scanner.close(); - } - - /** - * Get the prefetched scan result, if any. Otherwise, - * do a scan synchronously and return the result, which - * may take some time. Region scan coprocessor, if specified, - * is invoked properly, which may override the scan result. - * - * @param rows the number of rows to scan, which is preferred - * not to change among scanner.next() calls. - * - * @return scan result, which has the data retrieved from - * the scanner, or some IOException if the scan failed. - * @throws IOException if failed to retrieve from the scanner. - */ - public ScanResult getScanResult(final int rows) throws IOException { - Preconditions.checkArgument(rows > 0, "Number of rows requested must be positive"); - ScanResult scanResult = null; - this.rows = rows; - - if (prefetchScanFuture == null) { - // Need to scan inline if not prefetched - scanResult = prefetcher.call(); - } else { - // if we have a prefetched result, then use it - try { - scanResult = prefetchScanFuture.get(); - if (scanResult.moreResults) { - int prefetchedRows = scanResult.results.size(); - if (prefetchedRows != 0 && this.rows > prefetchedRows) { - // Try to scan more since we haven't prefetched enough - this.rows -= prefetchedRows; - ScanResult tmp = prefetcher.call(); - if (tmp.isException) { - return tmp; // Keep the prefetched results for later - } - if (tmp.results != null && !tmp.results.isEmpty()) { - // Merge new results to the old result list - scanResult.results.addAll(tmp.results); - } - // Reset rows for next prefetching - this.rows = rows; - } - } - prefetchScanFuture = null; - if (prefetchedResultSize > 0) { - globalPrefetchedResultSize.addAndGet(-prefetchedResultSize); - prefetchedResultSize = 0L; - } - } catch (ExecutionException ee) { - throw new IOException("failed to run prefetching task", ee.getCause()); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - IOException iie = new InterruptedIOException("scan was interrupted"); - iie.initCause(ie); - throw iie; - } - } - - if (prefetching - && scanResult.moreResults && !scanResult.results.isEmpty()) { - long totalPrefetchedResultSize = globalPrefetchedResultSize.get(); - if (totalPrefetchedResultSize < maxGlobalPrefetchedResultSize) { - // Schedule a background prefetch for the next result - // if prefetch is enabled on scans and there are more results - prefetchScanFuture = scanPrefetchThreadPool.submit(prefetcher); - } else if (LOG.isTraceEnabled()) { - LOG.trace("One prefetching is skipped for scanner " + scannerName - + " since total prefetched result size " + totalPrefetchedResultSize - + " is more than the maximum configured " - + maxGlobalPrefetchedResultSize); - } - } - return scanResult; - } - - /** - * Set the rows to prefetch, and start the prefetching task. - */ - public void enablePrefetching(int caching) { - if (caching > 0) { - rows = caching; - } else { - rows = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, - HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING); - } - maxGlobalPrefetchedResultSize = conf.getLong( - MAX_PREFETCHED_RESULT_SIZE_KEY, MAX_PREFETCHED_RESULT_SIZE_DEFAULT); - if (globalPrefetchedResultSize.get() < maxGlobalPrefetchedResultSize) { - prefetchScanFuture = scanPrefetchThreadPool.submit(prefetcher); - } - prefetching = true; - } - - /** - * This Callable abstracts calling a pre-fetch next. This is called on a - * threadpool. It makes a pre-fetch next call with the same parameters as - * the incoming next call. Note that the number of rows to return (nbRows) - * and/or the memory size for the result is the same as the previous call if - * pre-fetching is enabled. If these parameters change dynamically, - * they will take effect in the subsequent iteration. - */ - class ScanPrefetcher implements Callable { - boolean scannerClosing = false; - - public ScanResult call() { - ScanResult scanResult = null; - Leases.Lease lease = null; - try { - // Remove lease while its being processed in server; protects against case - // where processing of request takes > lease expiration time. - lease = leases.removeLease(scannerName); - List results = new ArrayList(rows); - long currentScanResultSize = 0; - boolean moreResults = true; - - boolean done = false; - long maxResultSize = scanner.getMaxResultSize(); - if (maxResultSize <= 0) { - maxResultSize = maxScannerResultSize; - } - String threadName = Thread.currentThread().getName(); - boolean prefetchingThread = threadName.startsWith(PREFETCHER_THREAD_PREFIX); - // Call coprocessor. Get region info from scanner. - if (region != null && region.getCoprocessorHost() != null) { - Boolean bypass = region.getCoprocessorHost().preScannerNext( - scanner, results, rows); - if (!results.isEmpty() - && (prefetchingThread || maxResultSize < Long.MAX_VALUE)) { - for (Result r : results) { - for (KeyValue kv : r.raw()) { - currentScanResultSize += kv.heapSize(); - } - } - } - if (bypass != null && bypass.booleanValue()) { - done = true; - } - } - - if (!done) { - List values = new ArrayList(); - MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint()); - region.startRegionOperation(); - try { - int i = 0; - synchronized(scanner) { - for (; i < rows - && currentScanResultSize < maxResultSize; i++) { - // Collect values to be returned here - boolean moreRows = scanner.nextRaw(values); - if (!values.isEmpty()) { - if (prefetchingThread || maxResultSize < Long.MAX_VALUE){ - for (KeyValue kv : values) { - currentScanResultSize += kv.heapSize(); - } - } - results.add(new Result(values)); - } - if (!moreRows) { - break; - } - values.clear(); - } - } - region.readRequestsCount.add(i); - } finally { - region.closeRegionOperation(); - } - - // coprocessor postNext hook - if (region.getCoprocessorHost() != null) { - region.getCoprocessorHost().postScannerNext(scanner, results, rows, true); - } - } - - // If the scanner's filter - if any - is done with the scan - // and wants to tell the client to stop the scan. This is done by passing - // a null result, and setting moreResults to false. - if (scanner.isFilterDone() && results.isEmpty()) { - moreResults = false; - results = null; - } - scanResult = new ScanResult(moreResults, results); - if (prefetchingThread && currentScanResultSize > 0) { - synchronized (prefetcher) { - if (!scannerClosing) { - globalPrefetchedResultSize.addAndGet(currentScanResultSize); - prefetchedResultSize = currentScanResultSize; - } - } - } - } catch (IOException e) { - // we should queue the exception as the result so that we can return - // this when the result is asked for - scanResult = new ScanResult(e); - } finally { - // We're done. On way out re-add the above removed lease. - // Adding resets expiration time on lease. - if (scanners.containsKey(scannerName)) { - if (lease != null) { - try { - leases.addLease(lease); - } catch (LeaseStillHeldException e) { - LOG.error("THIS SHOULD NOT HAPPEN", e); - } - } - } - } - return scanResult; - } - } -} - -/** - * This class abstracts the results of a single scanner's result. It tracks - * the list of Result objects if the pre-fetch next was successful, and - * tracks the exception if the next failed. - */ -class ScanResult { - final boolean isException; - IOException ioException = null; - - List results = null; - boolean moreResults = false; - - public ScanResult(IOException ioException) { - this.ioException = ioException; - isException = true; - } - - public ScanResult(boolean moreResults, List results) { - this.moreResults = moreResults; - this.results = results; - isException = false; - } -} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java index df835f7..4b86ff6 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java @@ -17,41 +17,31 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTestConst; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MediumTests; -import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; import org.apache.hadoop.hbase.filter.ColumnRangeFilter; -import org.apache.hadoop.hbase.regionserver.RegionScannerHolder; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.After; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; /** * A client-side test, mostly testing scanners with various parameters. */ @Category(MediumTests.class) -@RunWith(Parameterized.class) public class TestScannersFromClientSide { private static final Log LOG = LogFactory.getLog(TestScannersFromClientSide.class); @@ -61,37 +51,6 @@ public class TestScannersFromClientSide { private static byte [] QUALIFIER = Bytes.toBytes("testQualifier"); private static byte [] VALUE = Bytes.toBytes("testValue"); - private final boolean prefetching; - private long maxSize; - - @Parameters - public static final Collection parameters() { - List prefetchings = new ArrayList(); - prefetchings.add(new Object[] {Long.valueOf(-1)}); - prefetchings.add(new Object[] {Long.valueOf(0)}); - prefetchings.add(new Object[] {Long.valueOf(1)}); - prefetchings.add(new Object[] {Long.valueOf(1024)}); - return prefetchings; - } - - public TestScannersFromClientSide(Long maxPrefetchedResultSize) { - this.maxSize = maxPrefetchedResultSize.longValue(); - if (this.maxSize < 0) { - this.prefetching = false; - } else { - this.prefetching = true; - if (this.maxSize == 0) { - this.maxSize = RegionScannerHolder.MAX_PREFETCHED_RESULT_SIZE_DEFAULT; - } else { - MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); - for (JVMClusterUtil.RegionServerThread rst: cluster.getLiveRegionServerThreads()) { - Configuration conf = rst.getRegionServer().getConfiguration(); - conf.setLong(RegionScannerHolder.MAX_PREFETCHED_RESULT_SIZE_KEY, maxSize); - } - } - } - } - /** * @throws java.lang.Exception */ @@ -106,9 +65,22 @@ public class TestScannersFromClientSide { @AfterClass public static void tearDownAfterClass() throws Exception { TEST_UTIL.shutdownMiniCluster(); - long remainingPrefetchedSize = RegionScannerHolder.getPrefetchedResultSize(); - assertEquals("All prefetched results should be gone", - 0, remainingPrefetchedSize); + } + + /** + * @throws java.lang.Exception + */ + @Before + public void setUp() throws Exception { + // Nothing to do. + } + + /** + * @throws java.lang.Exception + */ + @After + public void tearDown() throws Exception { + // Nothing to do. } /** @@ -117,23 +89,8 @@ public class TestScannersFromClientSide { * @throws Exception */ @Test - public void testScanBatchWithDefaultCaching() throws Exception { - batchedScanWithCachingSpecified(-1); // Using default caching which is 100 - } - - /** - * Test from client side for batch of scan - * - * @throws Exception - */ - @Test public void testScanBatch() throws Exception { - batchedScanWithCachingSpecified(1); - } - - private void batchedScanWithCachingSpecified(int caching) throws Exception { - byte [] TABLE = Bytes.toBytes( - "testScanBatch-" + prefetching + "_" + maxSize + "_" + caching); + byte [] TABLE = Bytes.toBytes("testScanBatch"); byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 8); HTable ht = TEST_UTIL.createTable(TABLE, FAMILY); @@ -142,7 +99,7 @@ public class TestScannersFromClientSide { Scan scan; Delete delete; Result result; - ClientScanner scanner; + ResultScanner scanner; boolean toLog = true; List kvListExp; @@ -167,11 +124,8 @@ public class TestScannersFromClientSide { // without batch scan = new Scan(ROW); - scan.setCaching(caching); scan.setMaxVersions(); - scan.setPrefetching(prefetching); - scanner = (ClientScanner)ht.getScanner(scan); - verifyPrefetching(scanner); + scanner = ht.getScanner(scan); // c4:4, c5:5, c6:6, c7:7 kvListExp = new ArrayList(); @@ -181,16 +135,12 @@ public class TestScannersFromClientSide { kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE)); result = scanner.next(); verifyResult(result, kvListExp, toLog, "Testing first batch of scan"); - verifyPrefetching(scanner); // with batch scan = new Scan(ROW); - scan.setCaching(caching); scan.setMaxVersions(); scan.setBatch(2); - scan.setPrefetching(prefetching); - scanner = (ClientScanner)ht.getScanner(scan); - verifyPrefetching(scanner); + scanner = ht.getScanner(scan); // First batch: c4:4, c5:5 kvListExp = new ArrayList(); @@ -198,7 +148,6 @@ public class TestScannersFromClientSide { kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE)); result = scanner.next(); verifyResult(result, kvListExp, toLog, "Testing first batch of scan"); - verifyPrefetching(scanner); // Second batch: c6:6, c7:7 kvListExp = new ArrayList(); @@ -206,7 +155,7 @@ public class TestScannersFromClientSide { kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE)); result = scanner.next(); verifyResult(result, kvListExp, toLog, "Testing second batch of scan"); - verifyPrefetching(scanner); + } /** @@ -216,7 +165,7 @@ public class TestScannersFromClientSide { */ @Test public void testGetMaxResults() throws Exception { - byte [] TABLE = Bytes.toBytes("testGetMaxResults-" + prefetching + "_" + maxSize); + byte [] TABLE = Bytes.toBytes("testGetMaxResults"); byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20); @@ -336,7 +285,7 @@ public class TestScannersFromClientSide { */ @Test public void testScanMaxResults() throws Exception { - byte [] TABLE = Bytes.toBytes("testScanLimit-" + prefetching + "_" + maxSize); + byte [] TABLE = Bytes.toBytes("testScanLimit"); byte [][] ROWS = HTestConst.makeNAscii(ROW, 2); byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10); @@ -366,19 +315,17 @@ public class TestScannersFromClientSide { } scan = new Scan(); - scan.setCaching(1); - scan.setPrefetching(prefetching); scan.setMaxResultsPerColumnFamily(4); - ClientScanner scanner = (ClientScanner)ht.getScanner(scan); + ResultScanner scanner = ht.getScanner(scan); kvListScan = new ArrayList(); while ((result = scanner.next()) != null) { - verifyPrefetching(scanner); for (KeyValue kv : result.list()) { kvListScan.add(kv); } } result = new Result(kvListScan); verifyResult(result, kvListExp, toLog, "Testing scan with maxResults"); + } /** @@ -388,7 +335,7 @@ public class TestScannersFromClientSide { */ @Test public void testGetRowOffset() throws Exception { - byte [] TABLE = Bytes.toBytes("testGetRowOffset-" + prefetching + "_" + maxSize); + byte [] TABLE = Bytes.toBytes("testGetRowOffset"); byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20); @@ -476,48 +423,6 @@ public class TestScannersFromClientSide { "Testing offset + multiple CFs + maxResults"); } - /** - * For testing only, find a region scanner holder for a scan. - */ - RegionScannerHolder findRegionScannerHolder(ClientScanner scanner) { - long scannerId = scanner.currentScannerId(); - if (scannerId == -1L) return null; - - HRegionInfo expectedRegion = scanner.currentRegionInfo(); - MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); - for (JVMClusterUtil.RegionServerThread rst: cluster.getLiveRegionServerThreads()) { - RegionScannerHolder rsh = rst.getRegionServer().getScannerHolder(scannerId); - if (rsh != null && rsh.getRegionInfo().equals(expectedRegion)) { - return rsh; - } - } - return null; - } - - void verifyPrefetching(ClientScanner scanner) throws IOException { - long scannerId = scanner.currentScannerId(); - if (scannerId == -1L) return; // scanner is already closed - RegionScannerHolder rsh = findRegionScannerHolder(scanner); - assertNotNull("We should be able to find the scanner", rsh); - boolean isPrefetchSubmitted = rsh.isPrefetchSubmitted(); - if (prefetching && (RegionScannerHolder.getPrefetchedResultSize() < this.maxSize)) { - assertTrue("Prefetching should be submitted or no more result", - isPrefetchSubmitted || scanner.next() == null); - } else if (isPrefetchSubmitted) { - // Prefetch submitted, it must be because prefetching is enabled, - // and there was still room before it's scheduled - long sizeBefore = RegionScannerHolder.getPrefetchedResultSize() - - rsh.currentPrefetchedResultSize(); - assertTrue("There should have room before prefetching is submitted, maxSize=" + - this.maxSize + ", prefetching=" + prefetching + ", sizeBefore=" + sizeBefore, - prefetching && sizeBefore < this.maxSize); - } - if (isPrefetchSubmitted && rsh.waitForPrefetchingDone()) { - assertTrue("Prefetched result size should not be 0", - rsh.currentPrefetchedResultSize() > 0); - } - } - static void verifyResult(Result result, List expKvList, boolean toLog, String msg) { @@ -543,4 +448,6 @@ public class TestScannersFromClientSide { assertEquals(expKvList.size(), result.size()); } + + } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java index d40c8d0..f9ee5d3 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcesso import org.apache.hadoop.hbase.regionserver.BaseRowProcessor; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; @@ -130,7 +131,6 @@ public class TestRowProcessorEndpoint { // ignore table not found } table = util.createTable(TABLE, FAM); - table.setAutoFlush(false); { Put put = new Put(ROW); put.add(FAM, A, Bytes.add(B, C)); // B, C are friends of A @@ -144,8 +144,6 @@ public class TestRowProcessorEndpoint { put.add(FAM, F, G); table.put(put); row2Size = put.size(); - table.clearRegionCache(); - table.flushCommits(); } @Test diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java index 6237f64..6d1bcfd 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java @@ -299,7 +299,6 @@ public class TestProtobufUtil { scanBuilder = ClientProtos.Scan.newBuilder(proto); scanBuilder.setMaxVersions(1); scanBuilder.setCacheBlocks(true); - scanBuilder.setPrefetching(true); Scan scan = ProtobufUtil.toScan(proto); assertEquals(scanBuilder.build(), ProtobufUtil.toScan(scan)); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java index e041a1f..4608ed6 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java @@ -335,7 +335,6 @@ public class TestRegionServerMetrics { Scan s = new Scan(); s.setBatch(1); s.setCaching(1); - s.setPrefetching(false); ResultScanner resultScanners = t.getScanner(s); for (int nextCount = 0; nextCount < 30; nextCount++) { -- 1.7.10.2 (Apple Git-33)