From 3660752e2f633cf1a7f39a221026b78be926445a Mon Sep 17 00:00:00 2001 From: zhangduo Date: Mon, 28 Nov 2016 20:22:25 +0800 Subject: [PATCH] HBASE-17167 Pass mvcc to client when scan --- .../apache/hadoop/hbase/client/ClientScanner.java | 406 ++++++++++---------- .../org/apache/hadoop/hbase/client/HTable.java | 7 +- .../hbase/client/PackagePrivateFieldAccessor.java | 41 ++ .../java/org/apache/hadoop/hbase/client/Scan.java | 55 ++- .../hadoop/hbase/client/ScannerCallable.java | 3 + .../apache/hadoop/hbase/protobuf/ProtobufUtil.java | 8 + .../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 8 + .../shaded/protobuf/generated/ClientProtos.java | 412 +++++++++++++++----- .../src/main/protobuf/Client.proto | 6 + .../hbase/protobuf/generated/ClientProtos.java | 416 ++++++++++++++++----- hbase-protocol/src/main/protobuf/Client.proto | 6 + .../apache/hadoop/hbase/regionserver/HRegion.java | 9 +- .../hadoop/hbase/regionserver/RSRpcServices.java | 4 +- .../hbase/TestPartialResultsFromClientSide.java | 13 +- .../hbase/client/TestMvccConsistentScanner.java | 134 +++++++ .../apache/hadoop/hbase/regionserver/TestTags.java | 14 +- .../regionserver/TestReplicationSink.java | 22 +- 17 files changed, 1114 insertions(+), 450 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/PackagePrivateFieldAccessor.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMvccConsistentScanner.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 20ed183..dbff049 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -120,198 +120,192 @@ public abstract class ClientScanner extends AbstractClientScanner { ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("Scan table=" + tableName - + ", startRow=" + Bytes.toStringBinary(scan.getStartRow())); - } - this.scan = scan; - this.tableName = tableName; - this.lastNext = System.currentTimeMillis(); - this.connection = connection; - this.pool = pool; - this.primaryOperationTimeout = primaryOperationTimeout; - this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); - if (scan.getMaxResultSize() > 0) { - this.maxScannerResultSize = scan.getMaxResultSize(); - } else { - this.maxScannerResultSize = conf.getLong( - HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, - HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); - } - this.scannerTimeout = HBaseConfiguration.getInt(conf, - HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, - HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, - HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); - - // check if application wants to collect scan metrics - initScanMetrics(scan); - - // Use the caching from the Scan. If not set, use the default cache setting for this table. - if (this.scan.getCaching() > 0) { - this.caching = this.scan.getCaching(); - } else { - this.caching = conf.getInt( - HConstants.HBASE_CLIENT_SCANNER_CACHING, - HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING); - } - - this.caller = rpcFactory. newCaller(); - this.rpcControllerFactory = controllerFactory; - - this.conf = conf; - initCache(); - initializeScannerInConstruction(); - } + if (LOG.isTraceEnabled()) { + LOG.trace( + "Scan table=" + tableName + ", startRow=" + Bytes.toStringBinary(scan.getStartRow())); + } + this.scan = scan; + this.tableName = tableName; + this.lastNext = System.currentTimeMillis(); + this.connection = connection; + this.pool = pool; + this.primaryOperationTimeout = primaryOperationTimeout; + this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + if (scan.getMaxResultSize() > 0) { + this.maxScannerResultSize = scan.getMaxResultSize(); + } else { + this.maxScannerResultSize = conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); + } + this.scannerTimeout = + HBaseConfiguration.getInt(conf, HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, + HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); + + // check if application wants to collect scan metrics + initScanMetrics(scan); + + // Use the caching from the Scan. If not set, use the default cache setting for this table. + if (this.scan.getCaching() > 0) { + this.caching = this.scan.getCaching(); + } else { + this.caching = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING); + } + + this.caller = rpcFactory. newCaller(); + this.rpcControllerFactory = controllerFactory; + + this.conf = conf; + initCache(); + initializeScannerInConstruction(); + } - protected abstract void initCache(); + protected abstract void initCache(); - protected void initializeScannerInConstruction() throws IOException{ - // initialize the scanner - nextScanner(this.caching, false); - } + protected void initializeScannerInConstruction() throws IOException { + // initialize the scanner + nextScanner(this.caching, false); + } - protected ClusterConnection getConnection() { - return this.connection; - } + protected ClusterConnection getConnection() { + return this.connection; + } - protected TableName getTable() { - return this.tableName; - } + protected TableName getTable() { + return this.tableName; + } - protected int getRetries() { - return this.retries; - } + protected int getRetries() { + return this.retries; + } - protected int getScannerTimeout() { - return this.scannerTimeout; - } + protected int getScannerTimeout() { + return this.scannerTimeout; + } - protected Configuration getConf() { - return this.conf; - } + protected Configuration getConf() { + return this.conf; + } - protected Scan getScan() { - return scan; - } + protected Scan getScan() { + return scan; + } - protected ExecutorService getPool() { - return pool; - } + protected ExecutorService getPool() { + return pool; + } - protected int getPrimaryOperationTimeout() { - return primaryOperationTimeout; - } + protected int getPrimaryOperationTimeout() { + return primaryOperationTimeout; + } - protected int getCaching() { - return caching; - } + protected int getCaching() { + return caching; + } - protected long getTimestamp() { - return lastNext; - } + protected long getTimestamp() { + return lastNext; + } - @VisibleForTesting - protected long getMaxResultSize() { - return maxScannerResultSize; - } + @VisibleForTesting + protected long getMaxResultSize() { + return maxScannerResultSize; + } - // returns true if the passed region endKey - protected boolean checkScanStopRow(final byte [] endKey) { - if (this.scan.getStopRow().length > 0) { - // there is a stop row, check to see if we are past it. - byte [] stopRow = scan.getStopRow(); - int cmp = Bytes.compareTo(stopRow, 0, stopRow.length, - endKey, 0, endKey.length); - if (cmp <= 0) { - // stopRow <= endKey (endKey is equals to or larger than stopRow) - // This is a stop. - return true; - } + // returns true if the passed region endKey + protected boolean checkScanStopRow(final byte[] endKey) { + if (this.scan.getStopRow().length > 0) { + // there is a stop row, check to see if we are past it. + byte[] stopRow = scan.getStopRow(); + int cmp = Bytes.compareTo(stopRow, 0, stopRow.length, endKey, 0, endKey.length); + if (cmp <= 0) { + // stopRow <= endKey (endKey is equals to or larger than stopRow) + // This is a stop. + return true; } - return false; //unlikely. - } - - private boolean possiblyNextScanner(int nbRows, final boolean done) throws IOException { - // If we have just switched replica, don't go to the next scanner yet. Rather, try - // the scanner operations on the new replica, from the right point in the scan - // Note that when we switched to a different replica we left it at a point - // where we just did the "openScanner" with the appropriate startrow - if (callable != null && callable.switchedToADifferentReplica()) return true; - return nextScanner(nbRows, done); } + return false; // unlikely. + } - /* - * Gets a scanner for the next region. If this.currentRegion != null, then - * we will move to the endrow of this.currentRegion. Else we will get - * scanner at the scan.getStartRow(). We will go no further, just tidy - * up outstanding scanners, if currentRegion != null and - * done is true. - * @param nbRows - * @param done Server-side says we're done scanning. - */ - protected boolean nextScanner(int nbRows, final boolean done) - throws IOException { - // Close the previous scanner if it's open - if (this.callable != null) { - this.callable.setClose(); - call(callable, caller, scannerTimeout); - this.callable = null; - } + private boolean possiblyNextScanner(int nbRows, final boolean done) throws IOException { + // If we have just switched replica, don't go to the next scanner yet. Rather, try + // the scanner operations on the new replica, from the right point in the scan + // Note that when we switched to a different replica we left it at a point + // where we just did the "openScanner" with the appropriate startrow + if (callable != null && callable.switchedToADifferentReplica()) return true; + return nextScanner(nbRows, done); + } - // Where to start the next scanner - byte [] localStartKey; - - // if we're at end of table, close and return false to stop iterating - if (this.currentRegion != null) { - byte [] endKey = this.currentRegion.getEndKey(); - if (endKey == null || - Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) || - checkScanStopRow(endKey) || - done) { - close(); - if (LOG.isTraceEnabled()) { - LOG.trace("Finished " + this.currentRegion); - } - return false; - } - localStartKey = endKey; + /* + * Gets a scanner for the next region. If this.currentRegion != null, then we will move to the + * endrow of this.currentRegion. Else we will get scanner at the scan.getStartRow(). We will go no + * further, just tidy up outstanding scanners, if currentRegion != null and + * done is true. + * @param nbRows + * @param done Server-side says we're done scanning. + */ + protected boolean nextScanner(int nbRows, final boolean done) throws IOException { + // Close the previous scanner if it's open + if (this.callable != null) { + this.callable.setClose(); + call(callable, caller, scannerTimeout); + this.callable = null; + } + + // Where to start the next scanner + byte[] localStartKey; + + // if we're at end of table, close and return false to stop iterating + if (this.currentRegion != null) { + byte[] endKey = this.currentRegion.getEndKey(); + if (endKey == null || Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) + || checkScanStopRow(endKey) || done) { + close(); if (LOG.isTraceEnabled()) { LOG.trace("Finished " + this.currentRegion); } - } else { - localStartKey = this.scan.getStartRow(); + return false; } - - if (LOG.isDebugEnabled() && this.currentRegion != null) { - // Only worth logging if NOT first region in scan. - LOG.debug("Advancing internal scanner to startKey at '" + - Bytes.toStringBinary(localStartKey) + "'"); + localStartKey = endKey; + // clear mvcc read point if we are going to switch regions + scan.setMvccReadPoint(0L); + if (LOG.isTraceEnabled()) { + LOG.trace("Finished " + this.currentRegion); } - try { - callable = getScannerCallable(localStartKey, nbRows); - // Open a scanner on the region server starting at the - // beginning of the region - call(callable, caller, scannerTimeout); - this.currentRegion = callable.getHRegionInfo(); - if (this.scanMetrics != null) { - this.scanMetrics.countOfRegions.incrementAndGet(); - } - } catch (IOException e) { - close(); - throw e; + } else { + localStartKey = this.scan.getStartRow(); + } + + if (LOG.isDebugEnabled() && this.currentRegion != null) { + // Only worth logging if NOT first region in scan. + LOG.debug( + "Advancing internal scanner to startKey at '" + Bytes.toStringBinary(localStartKey) + "'"); + } + try { + callable = getScannerCallable(localStartKey, nbRows); + // Open a scanner on the region server starting at the + // beginning of the region + call(callable, caller, scannerTimeout); + this.currentRegion = callable.getHRegionInfo(); + if (this.scanMetrics != null) { + this.scanMetrics.countOfRegions.incrementAndGet(); } - return true; + } catch (IOException e) { + close(); + throw e; } + return true; + } @VisibleForTesting boolean isAnyRPCcancelled() { return callable.isAnyRPCcancelled(); } - Result[] call(ScannerCallableWithReplicas callable, - RpcRetryingCaller caller, int scannerTimeout) - throws IOException, RuntimeException { + Result[] call(ScannerCallableWithReplicas callable, RpcRetryingCaller caller, + int scannerTimeout) throws IOException, RuntimeException { if (Thread.interrupted()) { throw new InterruptedIOException(); } @@ -320,61 +314,57 @@ public abstract class ClientScanner extends AbstractClientScanner { return caller.callWithoutRetries(callable, scannerTimeout); } - @InterfaceAudience.Private - protected ScannerCallableWithReplicas getScannerCallable(byte [] localStartKey, - int nbRows) { - scan.setStartRow(localStartKey); - ScannerCallable s = - new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics, - this.rpcControllerFactory); - s.setCaching(nbRows); - ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(tableName, getConnection(), - s, pool, primaryOperationTimeout, scan, - retries, scannerTimeout, caching, conf, caller); - return sr; - } + @InterfaceAudience.Private + protected ScannerCallableWithReplicas getScannerCallable(byte[] localStartKey, int nbRows) { + scan.setStartRow(localStartKey); + ScannerCallable s = new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics, + this.rpcControllerFactory); + s.setCaching(nbRows); + ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(tableName, getConnection(), s, + pool, primaryOperationTimeout, scan, retries, scannerTimeout, caching, conf, caller); + return sr; + } - /** - * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the - * application or TableInputFormat.Later, we could push it to other systems. We don't use - * metrics framework because it doesn't support multi-instances of the same metrics on the same - * machine; for scan/map reduce scenarios, we will have multiple scans running at the same time. - * - * By default, scan metrics are disabled; if the application wants to collect them, this - * behavior can be turned on by calling calling {@link Scan#setScanMetricsEnabled(boolean)} - * - *

This invocation clears the scan metrics. Metrics are aggregated in the Scan instance. - */ - protected void writeScanMetrics() { - if (this.scanMetrics == null || scanMetricsPublished) { - return; - } - MapReduceProtos.ScanMetrics pScanMetrics = ProtobufUtil.toScanMetrics(scanMetrics); - scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, pScanMetrics.toByteArray()); - scanMetricsPublished = true; + /** + * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the + * application or TableInputFormat.Later, we could push it to other systems. We don't use metrics + * framework because it doesn't support multi-instances of the same metrics on the same machine; + * for scan/map reduce scenarios, we will have multiple scans running at the same time. By + * default, scan metrics are disabled; if the application wants to collect them, this behavior can + * be turned on by calling calling {@link Scan#setScanMetricsEnabled(boolean)} + *

+ * This invocation clears the scan metrics. Metrics are aggregated in the Scan instance. + */ + protected void writeScanMetrics() { + if (this.scanMetrics == null || scanMetricsPublished) { + return; } + MapReduceProtos.ScanMetrics pScanMetrics = ProtobufUtil.toScanMetrics(scanMetrics); + scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, pScanMetrics.toByteArray()); + scanMetricsPublished = true; + } - protected void initSyncCache() { + protected void initSyncCache() { cache = new LinkedList(); } - protected Result nextWithSyncCache() throws IOException { - // If the scanner is closed and there's nothing left in the cache, next is a no-op. - if (cache.size() == 0 && this.closed) { - return null; - } - if (cache.size() == 0) { - loadCache(); - } - - if (cache.size() > 0) { - return cache.poll(); - } - - // if we exhausted this scanner before calling close, write out the scan metrics - writeScanMetrics(); + protected Result nextWithSyncCache() throws IOException { + // If the scanner is closed and there's nothing left in the cache, next is a no-op. + if (cache.size() == 0 && this.closed) { return null; } + if (cache.size() == 0) { + loadCache(); + } + + if (cache.size() > 0) { + return cache.poll(); + } + + // if we exhausted this scanner before calling close, write out the scan metrics + writeScanMetrics(); + return null; + } @VisibleForTesting public int getCacheSize() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index b2c012d..c56132c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -334,7 +334,7 @@ public class HTable implements Table { * {@link Table#getScanner(Scan)} has other usage details. */ @Override - public ResultScanner getScanner(final Scan scan) throws IOException { + public ResultScanner getScanner(Scan scan) throws IOException { if (scan.getBatch() > 0 && scan.isSmall()) { throw new IllegalArgumentException("Small scan should not be used with batching"); } @@ -345,7 +345,10 @@ public class HTable implements Table { if (scan.getMaxResultSize() <= 0) { scan.setMaxResultSize(scannerMaxResultSize); } - + if (scan.getMvccReadPoint() > 0) { + // it is not supposed to be set by user, clear + scan.resetMvccReadPoint(); + } Boolean async = scan.isAsyncPrefetch(); if (async == null) { async = connConfiguration.isClientScannerAsyncPrefetch(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PackagePrivateFieldAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PackagePrivateFieldAccessor.java new file mode 100644 index 0000000..6a3ac18 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PackagePrivateFieldAccessor.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * A helper class used to access the package private field in o.a.h.h.client package. + *

+ * This is because we share some data structures between client and server and the data structures + * are marked as {@code InterfaceAudience.Public}, but we do not want to expose some of the fields + * to end user. + *

+ * TODO: A better solution is to separate the data structures used in client and server. + */ +@InterfaceAudience.Private +public class PackagePrivateFieldAccessor { + + public static void setMvccReadPoint(Scan scan, long mvccReadPoint) { + scan.setMvccReadPoint(mvccReadPoint); + } + + public static long getMvccReadPoint(Scan scan) { + return scan.getMvccReadPoint(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index b0d361c..853f5b1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -154,27 +154,24 @@ public class Scan extends Query { */ public static final boolean DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH = false; - /** - * Set it true for small scan to get better performance - * - * Small scan should use pread and big scan can use seek + read - * - * seek + read is fast but can cause two problem (1) resource contention (2) - * cause too much network io - * - * [89-fb] Using pread for non-compaction read request - * https://issues.apache.org/jira/browse/HBASE-7266 - * - * On the other hand, if setting it true, we would do - * openScanner,next,closeScanner in one RPC call. It means the better - * performance for small scan. [HBASE-9488]. - * - * Generally, if the scan range is within one data block(64KB), it could be - * considered as a small scan. + /** + * Set it true for small scan to get better performance Small scan should use pread and big scan + * can use seek + read seek + read is fast but can cause two problem (1) resource contention (2) + * cause too much network io [89-fb] Using pread for non-compaction read request + * https://issues.apache.org/jira/browse/HBASE-7266 On the other hand, if setting it true, we + * would do openScanner,next,closeScanner in one RPC call. It means the better performance for + * small scan. [HBASE-9488]. Generally, if the scan range is within one data block(64KB), it could + * be considered as a small scan. */ private boolean small = false; /** + * The mvcc read point to use when open a scanner. Remember to clear it after switching regions as + * the mvcc is only valid within region scope. + */ + private long mvccReadPoint = -1L; + + /** * Create a Scan operation across all rows. */ public Scan() {} @@ -253,6 +250,7 @@ public class Scan extends Query { TimeRange tr = entry.getValue(); setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax()); } + this.mvccReadPoint = scan.getMvccReadPoint(); } /** @@ -281,6 +279,7 @@ public class Scan extends Query { TimeRange tr = entry.getValue(); setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax()); } + this.mvccReadPoint = 0; } public boolean isGetScan() { @@ -976,4 +975,26 @@ public class Scan extends Query { this.asyncPrefetch = asyncPrefetch; return this; } + + /** + * Get the mvcc read point used to open a scanner. + */ + long getMvccReadPoint() { + return mvccReadPoint; + } + + /** + * Set the mvcc read point used to open a scanner. + */ + Scan setMvccReadPoint(long mvccReadPoint) { + this.mvccReadPoint = mvccReadPoint; + return this; + } + + /** + * Set the mvcc read point to -1 which means do not use it. + */ + Scan resetMvccReadPoint() { + return setMvccReadPoint(-1L); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 0351e54..7a22648 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -375,6 +375,9 @@ public class ScannerCallable extends ClientServiceCallable { LOG.info("Open scanner=" + id + " for scan=" + scan.toString() + " on region " + getLocation().toString()); } + if (response.hasMvccReadPoint()) { + this.scan.setMvccReadPoint(response.getMvccReadPoint()); + } return id; } catch (Exception e) { throw ProtobufUtil.handleRemoteException(e); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 330348d..c52d413 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -907,6 +908,10 @@ public final class ProtobufUtil { if (scan.getCaching() > 0) { scanBuilder.setCaching(scan.getCaching()); } + long mvccReadPoint = PackagePrivateFieldAccessor.getMvccReadPoint(scan); + if (mvccReadPoint > 0) { + scanBuilder.setMvccReadPoint(mvccReadPoint); + } return scanBuilder.build(); } @@ -994,6 +999,9 @@ public final class ProtobufUtil { if (proto.hasCaching()) { scan.setCaching(proto.getCaching()); } + if (proto.hasMvccReadPoint()) { + PackagePrivateFieldAccessor.setMvccReadPoint(scan, proto.getMvccReadPoint()); + } return scan; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 7d1770e..5876fae 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionLoadStats; import org.apache.hadoop.hbase.client.Result; @@ -1019,6 +1020,10 @@ public final class ProtobufUtil { if (scan.getCaching() > 0) { scanBuilder.setCaching(scan.getCaching()); } + long mvccReadPoint = PackagePrivateFieldAccessor.getMvccReadPoint(scan); + if (mvccReadPoint > 0) { + scanBuilder.setMvccReadPoint(mvccReadPoint); + } return scanBuilder.build(); } @@ -1106,6 +1111,9 @@ public final class ProtobufUtil { if (proto.hasCaching()) { scan.setCaching(proto.getCaching()); } + if (proto.hasMvccReadPoint()) { + PackagePrivateFieldAccessor.setMvccReadPoint(scan, proto.getMvccReadPoint()); + } return scan; } diff --git a/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java b/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java index bfd196e..e9458df 100644 --- a/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java +++ b/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java @@ -14554,6 +14554,15 @@ public final class ClientProtos { */ org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ColumnFamilyTimeRangeOrBuilder getCfTimeRangeOrBuilder( int index); + + /** + * optional uint64 mvcc_read_point = 20 [default = 0]; + */ + boolean hasMvccReadPoint(); + /** + * optional uint64 mvcc_read_point = 20 [default = 0]; + */ + long getMvccReadPoint(); } /** *

@@ -14594,6 +14603,7 @@ public final class ClientProtos {
       caching_ = 0;
       allowPartialResults_ = false;
       cfTimeRange_ = java.util.Collections.emptyList();
+      mvccReadPoint_ = 0L;
     }
 
     @java.lang.Override
@@ -14753,6 +14763,11 @@ public final class ClientProtos {
                   input.readMessage(org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ColumnFamilyTimeRange.PARSER, extensionRegistry));
               break;
             }
+            case 160: {
+              bitField0_ |= 0x00010000;
+              mvccReadPoint_ = input.readUInt64();
+              break;
+            }
           }
         }
       } catch (org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException e) {
@@ -15153,6 +15168,21 @@ public final class ClientProtos {
       return cfTimeRange_.get(index);
     }
 
+    public static final int MVCC_READ_POINT_FIELD_NUMBER = 20;
+    private long mvccReadPoint_;
+    /**
+     * optional uint64 mvcc_read_point = 20 [default = 0];
+     */
+    public boolean hasMvccReadPoint() {
+      return ((bitField0_ & 0x00010000) == 0x00010000);
+    }
+    /**
+     * optional uint64 mvcc_read_point = 20 [default = 0];
+     */
+    public long getMvccReadPoint() {
+      return mvccReadPoint_;
+    }
+
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
       byte isInitialized = memoizedIsInitialized;
@@ -15246,6 +15276,9 @@ public final class ClientProtos {
       for (int i = 0; i < cfTimeRange_.size(); i++) {
         output.writeMessage(19, cfTimeRange_.get(i));
       }
+      if (((bitField0_ & 0x00010000) == 0x00010000)) {
+        output.writeUInt64(20, mvccReadPoint_);
+      }
       unknownFields.writeTo(output);
     }
 
@@ -15330,6 +15363,10 @@ public final class ClientProtos {
         size += org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream
           .computeMessageSize(19, cfTimeRange_.get(i));
       }
+      if (((bitField0_ & 0x00010000) == 0x00010000)) {
+        size += org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream
+          .computeUInt64Size(20, mvccReadPoint_);
+      }
       size += unknownFields.getSerializedSize();
       memoizedSize = size;
       return size;
@@ -15432,6 +15469,11 @@ public final class ClientProtos {
       }
       result = result && getCfTimeRangeList()
           .equals(other.getCfTimeRangeList());
+      result = result && (hasMvccReadPoint() == other.hasMvccReadPoint());
+      if (hasMvccReadPoint()) {
+        result = result && (getMvccReadPoint()
+            == other.getMvccReadPoint());
+      }
       result = result && unknownFields.equals(other.unknownFields);
       return result;
     }
@@ -15525,6 +15567,11 @@ public final class ClientProtos {
         hash = (37 * hash) + CF_TIME_RANGE_FIELD_NUMBER;
         hash = (53 * hash) + getCfTimeRangeList().hashCode();
       }
+      if (hasMvccReadPoint()) {
+        hash = (37 * hash) + MVCC_READ_POINT_FIELD_NUMBER;
+        hash = (53 * hash) + org.apache.hadoop.hbase.shaded.com.google.protobuf.Internal.hashLong(
+            getMvccReadPoint());
+      }
       hash = (29 * hash) + unknownFields.hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -15716,6 +15763,8 @@ public final class ClientProtos {
         } else {
           cfTimeRangeBuilder_.clear();
         }
+        mvccReadPoint_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00080000);
         return this;
       }
 
@@ -15839,6 +15888,10 @@ public final class ClientProtos {
         } else {
           result.cfTimeRange_ = cfTimeRangeBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00080000) == 0x00080000)) {
+          to_bitField0_ |= 0x00010000;
+        }
+        result.mvccReadPoint_ = mvccReadPoint_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -16007,6 +16060,9 @@ public final class ClientProtos {
             }
           }
         }
+        if (other.hasMvccReadPoint()) {
+          setMvccReadPoint(other.getMvccReadPoint());
+        }
         this.mergeUnknownFields(other.unknownFields);
         onChanged();
         return this;
@@ -17484,6 +17540,38 @@ public final class ClientProtos {
         }
         return cfTimeRangeBuilder_;
       }
+
+      private long mvccReadPoint_ ;
+      /**
+       * optional uint64 mvcc_read_point = 20 [default = 0];
+       */
+      public boolean hasMvccReadPoint() {
+        return ((bitField0_ & 0x00080000) == 0x00080000);
+      }
+      /**
+       * optional uint64 mvcc_read_point = 20 [default = 0];
+       */
+      public long getMvccReadPoint() {
+        return mvccReadPoint_;
+      }
+      /**
+       * optional uint64 mvcc_read_point = 20 [default = 0];
+       */
+      public Builder setMvccReadPoint(long value) {
+        bitField0_ |= 0x00080000;
+        mvccReadPoint_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * optional uint64 mvcc_read_point = 20 [default = 0];
+       */
+      public Builder clearMvccReadPoint() {
+        bitField0_ = (bitField0_ & ~0x00080000);
+        mvccReadPoint_ = 0L;
+        onChanged();
+        return this;
+      }
       public final Builder setUnknownFields(
           final org.apache.hadoop.hbase.shaded.com.google.protobuf.UnknownFieldSet unknownFields) {
         return super.setUnknownFields(unknownFields);
@@ -19311,6 +19399,27 @@ public final class ClientProtos {
      * optional .hbase.pb.ScanMetrics scan_metrics = 10;
      */
     org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetricsOrBuilder getScanMetricsOrBuilder();
+
+    /**
+     * 
+     * The mvcc read point which is used to open the scanner at server side. Client can
+     * make use of this mvcc_read_point when restarting a scanner to get a consistent view
+     * of a row.
+     * 
+ * + * optional uint64 mvcc_read_point = 11 [default = 0]; + */ + boolean hasMvccReadPoint(); + /** + *
+     * The mvcc read point which is used to open the scanner at server side. Client can
+     * make use of this mvcc_read_point when restarting a scanner to get a consistent view
+     * of a row.
+     * 
+ * + * optional uint64 mvcc_read_point = 11 [default = 0]; + */ + long getMvccReadPoint(); } /** *
@@ -19339,6 +19448,7 @@ public final class ClientProtos {
       partialFlagPerResult_ = java.util.Collections.emptyList();
       moreResultsInRegion_ = false;
       heartbeatMessage_ = false;
+      mvccReadPoint_ = 0L;
     }
 
     @java.lang.Override
@@ -19463,6 +19573,11 @@ public final class ClientProtos {
               bitField0_ |= 0x00000040;
               break;
             }
+            case 88: {
+              bitField0_ |= 0x00000080;
+              mvccReadPoint_ = input.readUInt64();
+              break;
+            }
           }
         }
       } catch (org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException e) {
@@ -19821,6 +19936,33 @@ public final class ClientProtos {
       return scanMetrics_ == null ? org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics.getDefaultInstance() : scanMetrics_;
     }
 
+    public static final int MVCC_READ_POINT_FIELD_NUMBER = 11;
+    private long mvccReadPoint_;
+    /**
+     * 
+     * The mvcc read point which is used to open the scanner at server side. Client can
+     * make use of this mvcc_read_point when restarting a scanner to get a consistent view
+     * of a row.
+     * 
+ * + * optional uint64 mvcc_read_point = 11 [default = 0]; + */ + public boolean hasMvccReadPoint() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + /** + *
+     * The mvcc read point which is used to open the scanner at server side. Client can
+     * make use of this mvcc_read_point when restarting a scanner to get a consistent view
+     * of a row.
+     * 
+ * + * optional uint64 mvcc_read_point = 11 [default = 0]; + */ + public long getMvccReadPoint() { + return mvccReadPoint_; + } + private byte memoizedIsInitialized = -1; public final boolean isInitialized() { byte isInitialized = memoizedIsInitialized; @@ -19863,6 +20005,9 @@ public final class ClientProtos { if (((bitField0_ & 0x00000040) == 0x00000040)) { output.writeMessage(10, getScanMetrics()); } + if (((bitField0_ & 0x00000080) == 0x00000080)) { + output.writeUInt64(11, mvccReadPoint_); + } unknownFields.writeTo(output); } @@ -19918,6 +20063,10 @@ public final class ClientProtos { size += org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream .computeMessageSize(10, getScanMetrics()); } + if (((bitField0_ & 0x00000080) == 0x00000080)) { + size += org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream + .computeUInt64Size(11, mvccReadPoint_); + } size += unknownFields.getSerializedSize(); memoizedSize = size; return size; @@ -19976,6 +20125,11 @@ public final class ClientProtos { result = result && getScanMetrics() .equals(other.getScanMetrics()); } + result = result && (hasMvccReadPoint() == other.hasMvccReadPoint()); + if (hasMvccReadPoint()) { + result = result && (getMvccReadPoint() + == other.getMvccReadPoint()); + } result = result && unknownFields.equals(other.unknownFields); return result; } @@ -20032,6 +20186,11 @@ public final class ClientProtos { hash = (37 * hash) + SCAN_METRICS_FIELD_NUMBER; hash = (53 * hash) + getScanMetrics().hashCode(); } + if (hasMvccReadPoint()) { + hash = (37 * hash) + MVCC_READ_POINT_FIELD_NUMBER; + hash = (53 * hash) + org.apache.hadoop.hbase.shaded.com.google.protobuf.Internal.hashLong( + getMvccReadPoint()); + } hash = (29 * hash) + unknownFields.hashCode(); memoizedHashCode = hash; return hash; @@ -20186,6 +20345,8 @@ public final class ClientProtos { scanMetricsBuilder_.clear(); } bitField0_ = (bitField0_ & ~0x00000200); + mvccReadPoint_ = 0L; + bitField0_ = (bitField0_ & ~0x00000400); return this; } @@ -20261,6 +20422,10 @@ public final class ClientProtos { } else { result.scanMetrics_ = scanMetricsBuilder_.build(); } + if (((from_bitField0_ & 0x00000400) == 0x00000400)) { + to_bitField0_ |= 0x00000080; + } + result.mvccReadPoint_ = mvccReadPoint_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -20370,6 +20535,9 @@ public final class ClientProtos { if (other.hasScanMetrics()) { mergeScanMetrics(other.getScanMetrics()); } + if (other.hasMvccReadPoint()) { + setMvccReadPoint(other.getMvccReadPoint()); + } this.mergeUnknownFields(other.unknownFields); onChanged(); return this; @@ -21433,6 +21601,62 @@ public final class ClientProtos { } return scanMetricsBuilder_; } + + private long mvccReadPoint_ ; + /** + *
+       * The mvcc read point which is used to open the scanner at server side. Client can
+       * make use of this mvcc_read_point when restarting a scanner to get a consistent view
+       * of a row.
+       * 
+ * + * optional uint64 mvcc_read_point = 11 [default = 0]; + */ + public boolean hasMvccReadPoint() { + return ((bitField0_ & 0x00000400) == 0x00000400); + } + /** + *
+       * The mvcc read point which is used to open the scanner at server side. Client can
+       * make use of this mvcc_read_point when restarting a scanner to get a consistent view
+       * of a row.
+       * 
+ * + * optional uint64 mvcc_read_point = 11 [default = 0]; + */ + public long getMvccReadPoint() { + return mvccReadPoint_; + } + /** + *
+       * The mvcc read point which is used to open the scanner at server side. Client can
+       * make use of this mvcc_read_point when restarting a scanner to get a consistent view
+       * of a row.
+       * 
+ * + * optional uint64 mvcc_read_point = 11 [default = 0]; + */ + public Builder setMvccReadPoint(long value) { + bitField0_ |= 0x00000400; + mvccReadPoint_ = value; + onChanged(); + return this; + } + /** + *
+       * The mvcc read point which is used to open the scanner at server side. Client can
+       * make use of this mvcc_read_point when restarting a scanner to get a consistent view
+       * of a row.
+       * 
+ * + * optional uint64 mvcc_read_point = 11 [default = 0]; + */ + public Builder clearMvccReadPoint() { + bitField0_ = (bitField0_ & ~0x00000400); + mvccReadPoint_ = 0L; + onChanged(); + return this; + } public final Builder setUnknownFields( final org.apache.hadoop.hbase.shaded.com.google.protobuf.UnknownFieldSet unknownFields) { return super.setUnknownFields(unknownFields); @@ -40434,7 +40658,7 @@ public final class ClientProtos { "tion\030\003 \001(\0132\023.hbase.pb.Condition\022\023\n\013nonce" + "_group\030\004 \001(\004\"E\n\016MutateResponse\022 \n\006result" + "\030\001 \001(\0132\020.hbase.pb.Result\022\021\n\tprocessed\030\002 " + - "\001(\010\"\275\004\n\004Scan\022 \n\006column\030\001 \003(\0132\020.hbase.pb." + + "\001(\010\"\331\004\n\004Scan\022 \n\006column\030\001 \003(\0132\020.hbase.pb." + "Column\022*\n\tattribute\030\002 \003(\0132\027.hbase.pb.Nam" + "eBytesPair\022\021\n\tstart_row\030\003 \001(\014\022\020\n\010stop_ro" + "w\030\004 \001(\014\022 \n\006filter\030\005 \001(\0132\020.hbase.pb.Filte" + @@ -40448,96 +40672,98 @@ public final class ClientProtos { " \001(\0162\025.hbase.pb.Consistency:\006STRONG\022\017\n\007c" + "aching\030\021 \001(\r\022\035\n\025allow_partial_results\030\022 " + "\001(\010\0226\n\rcf_time_range\030\023 \003(\0132\037.hbase.pb.Co" + - "lumnFamilyTimeRange\"\246\002\n\013ScanRequest\022)\n\006r", - "egion\030\001 \001(\0132\031.hbase.pb.RegionSpecifier\022\034" + - "\n\004scan\030\002 \001(\0132\016.hbase.pb.Scan\022\022\n\nscanner_" + - "id\030\003 \001(\004\022\026\n\016number_of_rows\030\004 \001(\r\022\025\n\rclos" + - "e_scanner\030\005 \001(\010\022\025\n\rnext_call_seq\030\006 \001(\004\022\037" + - "\n\027client_handles_partials\030\007 \001(\010\022!\n\031clien" + - "t_handles_heartbeats\030\010 \001(\010\022\032\n\022track_scan" + - "_metrics\030\t \001(\010\022\024\n\005renew\030\n \001(\010:\005false\"\232\002\n" + - "\014ScanResponse\022\030\n\020cells_per_result\030\001 \003(\r\022" + - "\022\n\nscanner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(" + - "\010\022\013\n\003ttl\030\004 \001(\r\022!\n\007results\030\005 \003(\0132\020.hbase.", - "pb.Result\022\r\n\005stale\030\006 \001(\010\022\037\n\027partial_flag" + - "_per_result\030\007 \003(\010\022\036\n\026more_results_in_reg" + - "ion\030\010 \001(\010\022\031\n\021heartbeat_message\030\t \001(\010\022+\n\014" + - "scan_metrics\030\n \001(\0132\025.hbase.pb.ScanMetric" + - "s\"\240\002\n\024BulkLoadHFileRequest\022)\n\006region\030\001 \002" + - "(\0132\031.hbase.pb.RegionSpecifier\022>\n\013family_" + - "path\030\002 \003(\0132).hbase.pb.BulkLoadHFileReque" + - "st.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\022+\n" + - "\010fs_token\030\004 \001(\0132\031.hbase.pb.DelegationTok" + - "en\022\022\n\nbulk_token\030\005 \001(\t\022\030\n\tcopy_file\030\006 \001(", - "\010:\005false\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014" + - "\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022\016\n" + - "\006loaded\030\001 \002(\010\"V\n\017DelegationToken\022\022\n\niden" + - "tifier\030\001 \001(\014\022\020\n\010password\030\002 \001(\014\022\014\n\004kind\030\003" + - " \001(\t\022\017\n\007service\030\004 \001(\t\"l\n\026PrepareBulkLoad" + - "Request\022\'\n\ntable_name\030\001 \002(\0132\023.hbase.pb.T" + - "ableName\022)\n\006region\030\002 \001(\0132\031.hbase.pb.Regi" + - "onSpecifier\"-\n\027PrepareBulkLoadResponse\022\022" + - "\n\nbulk_token\030\001 \002(\t\"W\n\026CleanupBulkLoadReq" + - "uest\022\022\n\nbulk_token\030\001 \002(\t\022)\n\006region\030\002 \001(\013", - "2\031.hbase.pb.RegionSpecifier\"\031\n\027CleanupBu" + - "lkLoadResponse\"a\n\026CoprocessorServiceCall" + - "\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n\013m" + - "ethod_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"B\n\030Cop" + - "rocessorServiceResult\022&\n\005value\030\001 \001(\0132\027.h" + - "base.pb.NameBytesPair\"v\n\031CoprocessorServ" + - "iceRequest\022)\n\006region\030\001 \002(\0132\031.hbase.pb.Re" + - "gionSpecifier\022.\n\004call\030\002 \002(\0132 .hbase.pb.C" + - "oprocessorServiceCall\"o\n\032CoprocessorServ" + - "iceResponse\022)\n\006region\030\001 \002(\0132\031.hbase.pb.R", - "egionSpecifier\022&\n\005value\030\002 \002(\0132\027.hbase.pb" + - ".NameBytesPair\"\226\001\n\006Action\022\r\n\005index\030\001 \001(\r" + - "\022)\n\010mutation\030\002 \001(\0132\027.hbase.pb.MutationPr" + - "oto\022\032\n\003get\030\003 \001(\0132\r.hbase.pb.Get\0226\n\014servi" + - "ce_call\030\004 \001(\0132 .hbase.pb.CoprocessorServ" + - "iceCall\"k\n\014RegionAction\022)\n\006region\030\001 \002(\0132" + - "\031.hbase.pb.RegionSpecifier\022\016\n\006atomic\030\002 \001" + - "(\010\022 \n\006action\030\003 \003(\0132\020.hbase.pb.Action\"c\n\017" + - "RegionLoadStats\022\027\n\014memstoreLoad\030\001 \001(\005:\0010" + - "\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\022\035\n\022compaction", - "Pressure\030\003 \001(\005:\0010\"j\n\024MultiRegionLoadStat" + - "s\022)\n\006region\030\001 \003(\0132\031.hbase.pb.RegionSpeci" + - "fier\022\'\n\004stat\030\002 \003(\0132\031.hbase.pb.RegionLoad" + - "Stats\"\336\001\n\021ResultOrException\022\r\n\005index\030\001 \001" + - "(\r\022 \n\006result\030\002 \001(\0132\020.hbase.pb.Result\022*\n\t" + - "exception\030\003 \001(\0132\027.hbase.pb.NameBytesPair" + - "\022:\n\016service_result\030\004 \001(\0132\".hbase.pb.Copr" + - "ocessorServiceResult\0220\n\tloadStats\030\005 \001(\0132" + - "\031.hbase.pb.RegionLoadStatsB\002\030\001\"x\n\022Region" + - "ActionResult\0226\n\021resultOrException\030\001 \003(\0132", - "\033.hbase.pb.ResultOrException\022*\n\texceptio" + - "n\030\002 \001(\0132\027.hbase.pb.NameBytesPair\"x\n\014Mult" + - "iRequest\022,\n\014regionAction\030\001 \003(\0132\026.hbase.p" + - "b.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022&\n\tco" + - "ndition\030\003 \001(\0132\023.hbase.pb.Condition\"\226\001\n\rM" + - "ultiResponse\0228\n\022regionActionResult\030\001 \003(\013" + - "2\034.hbase.pb.RegionActionResult\022\021\n\tproces" + - "sed\030\002 \001(\010\0228\n\020regionStatistics\030\003 \001(\0132\036.hb" + - "ase.pb.MultiRegionLoadStats*\'\n\013Consisten" + - "cy\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\263\005\n\rClientS", - "ervice\0222\n\003Get\022\024.hbase.pb.GetRequest\032\025.hb" + - "ase.pb.GetResponse\022;\n\006Mutate\022\027.hbase.pb." + - "MutateRequest\032\030.hbase.pb.MutateResponse\022" + - "5\n\004Scan\022\025.hbase.pb.ScanRequest\032\026.hbase.p" + - "b.ScanResponse\022P\n\rBulkLoadHFile\022\036.hbase." + - "pb.BulkLoadHFileRequest\032\037.hbase.pb.BulkL" + - "oadHFileResponse\022V\n\017PrepareBulkLoad\022 .hb" + - "ase.pb.PrepareBulkLoadRequest\032!.hbase.pb" + - ".PrepareBulkLoadResponse\022V\n\017CleanupBulkL" + - "oad\022 .hbase.pb.CleanupBulkLoadRequest\032!.", - "hbase.pb.CleanupBulkLoadResponse\022X\n\013Exec" + - "Service\022#.hbase.pb.CoprocessorServiceReq" + - "uest\032$.hbase.pb.CoprocessorServiceRespon" + - "se\022d\n\027ExecRegionServerService\022#.hbase.pb" + - ".CoprocessorServiceRequest\032$.hbase.pb.Co" + - "processorServiceResponse\0228\n\005Multi\022\026.hbas" + - "e.pb.MultiRequest\032\027.hbase.pb.MultiRespon" + - "seBI\n1org.apache.hadoop.hbase.shaded.pro" + - "tobuf.generatedB\014ClientProtosH\001\210\001\001\240\001\001" + "lumnFamilyTimeRange\022\032\n\017mvcc_read_point\030\024", + " \001(\004:\0010\"\246\002\n\013ScanRequest\022)\n\006region\030\001 \001(\0132" + + "\031.hbase.pb.RegionSpecifier\022\034\n\004scan\030\002 \001(\013" + + "2\016.hbase.pb.Scan\022\022\n\nscanner_id\030\003 \001(\004\022\026\n\016" + + "number_of_rows\030\004 \001(\r\022\025\n\rclose_scanner\030\005 " + + "\001(\010\022\025\n\rnext_call_seq\030\006 \001(\004\022\037\n\027client_han" + + "dles_partials\030\007 \001(\010\022!\n\031client_handles_he" + + "artbeats\030\010 \001(\010\022\032\n\022track_scan_metrics\030\t \001" + + "(\010\022\024\n\005renew\030\n \001(\010:\005false\"\266\002\n\014ScanRespons" + + "e\022\030\n\020cells_per_result\030\001 \003(\r\022\022\n\nscanner_i" + + "d\030\002 \001(\004\022\024\n\014more_results\030\003 \001(\010\022\013\n\003ttl\030\004 \001", + "(\r\022!\n\007results\030\005 \003(\0132\020.hbase.pb.Result\022\r\n" + + "\005stale\030\006 \001(\010\022\037\n\027partial_flag_per_result\030" + + "\007 \003(\010\022\036\n\026more_results_in_region\030\010 \001(\010\022\031\n" + + "\021heartbeat_message\030\t \001(\010\022+\n\014scan_metrics" + + "\030\n \001(\0132\025.hbase.pb.ScanMetrics\022\032\n\017mvcc_re" + + "ad_point\030\013 \001(\004:\0010\"\240\002\n\024BulkLoadHFileReque" + + "st\022)\n\006region\030\001 \002(\0132\031.hbase.pb.RegionSpec" + + "ifier\022>\n\013family_path\030\002 \003(\0132).hbase.pb.Bu" + + "lkLoadHFileRequest.FamilyPath\022\026\n\016assign_" + + "seq_num\030\003 \001(\010\022+\n\010fs_token\030\004 \001(\0132\031.hbase.", + "pb.DelegationToken\022\022\n\nbulk_token\030\005 \001(\t\022\030" + + "\n\tcopy_file\030\006 \001(\010:\005false\032*\n\nFamilyPath\022\016" + + "\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoad" + + "HFileResponse\022\016\n\006loaded\030\001 \002(\010\"V\n\017Delegat" + + "ionToken\022\022\n\nidentifier\030\001 \001(\014\022\020\n\010password" + + "\030\002 \001(\014\022\014\n\004kind\030\003 \001(\t\022\017\n\007service\030\004 \001(\t\"l\n" + + "\026PrepareBulkLoadRequest\022\'\n\ntable_name\030\001 " + + "\002(\0132\023.hbase.pb.TableName\022)\n\006region\030\002 \001(\013" + + "2\031.hbase.pb.RegionSpecifier\"-\n\027PrepareBu" + + "lkLoadResponse\022\022\n\nbulk_token\030\001 \002(\t\"W\n\026Cl", + "eanupBulkLoadRequest\022\022\n\nbulk_token\030\001 \002(\t" + + "\022)\n\006region\030\002 \001(\0132\031.hbase.pb.RegionSpecif" + + "ier\"\031\n\027CleanupBulkLoadResponse\"a\n\026Coproc" + + "essorServiceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service" + + "_name\030\002 \002(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007requ" + + "est\030\004 \002(\014\"B\n\030CoprocessorServiceResult\022&\n" + + "\005value\030\001 \001(\0132\027.hbase.pb.NameBytesPair\"v\n" + + "\031CoprocessorServiceRequest\022)\n\006region\030\001 \002" + + "(\0132\031.hbase.pb.RegionSpecifier\022.\n\004call\030\002 " + + "\002(\0132 .hbase.pb.CoprocessorServiceCall\"o\n", + "\032CoprocessorServiceResponse\022)\n\006region\030\001 " + + "\002(\0132\031.hbase.pb.RegionSpecifier\022&\n\005value\030" + + "\002 \002(\0132\027.hbase.pb.NameBytesPair\"\226\001\n\006Actio" + + "n\022\r\n\005index\030\001 \001(\r\022)\n\010mutation\030\002 \001(\0132\027.hba" + + "se.pb.MutationProto\022\032\n\003get\030\003 \001(\0132\r.hbase" + + ".pb.Get\0226\n\014service_call\030\004 \001(\0132 .hbase.pb" + + ".CoprocessorServiceCall\"k\n\014RegionAction\022" + + ")\n\006region\030\001 \002(\0132\031.hbase.pb.RegionSpecifi" + + "er\022\016\n\006atomic\030\002 \001(\010\022 \n\006action\030\003 \003(\0132\020.hba" + + "se.pb.Action\"c\n\017RegionLoadStats\022\027\n\014memst", + "oreLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy\030\002 \001(\005:" + + "\0010\022\035\n\022compactionPressure\030\003 \001(\005:\0010\"j\n\024Mul" + + "tiRegionLoadStats\022)\n\006region\030\001 \003(\0132\031.hbas" + + "e.pb.RegionSpecifier\022\'\n\004stat\030\002 \003(\0132\031.hba" + + "se.pb.RegionLoadStats\"\336\001\n\021ResultOrExcept" + + "ion\022\r\n\005index\030\001 \001(\r\022 \n\006result\030\002 \001(\0132\020.hba" + + "se.pb.Result\022*\n\texception\030\003 \001(\0132\027.hbase." + + "pb.NameBytesPair\022:\n\016service_result\030\004 \001(\013" + + "2\".hbase.pb.CoprocessorServiceResult\0220\n\t" + + "loadStats\030\005 \001(\0132\031.hbase.pb.RegionLoadSta", + "tsB\002\030\001\"x\n\022RegionActionResult\0226\n\021resultOr" + + "Exception\030\001 \003(\0132\033.hbase.pb.ResultOrExcep" + + "tion\022*\n\texception\030\002 \001(\0132\027.hbase.pb.NameB" + + "ytesPair\"x\n\014MultiRequest\022,\n\014regionAction" + + "\030\001 \003(\0132\026.hbase.pb.RegionAction\022\022\n\nnonceG" + + "roup\030\002 \001(\004\022&\n\tcondition\030\003 \001(\0132\023.hbase.pb" + + ".Condition\"\226\001\n\rMultiResponse\0228\n\022regionAc" + + "tionResult\030\001 \003(\0132\034.hbase.pb.RegionAction" + + "Result\022\021\n\tprocessed\030\002 \001(\010\0228\n\020regionStati" + + "stics\030\003 \001(\0132\036.hbase.pb.MultiRegionLoadSt", + "ats*\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010TIMELI" + + "NE\020\0012\263\005\n\rClientService\0222\n\003Get\022\024.hbase.pb" + + ".GetRequest\032\025.hbase.pb.GetResponse\022;\n\006Mu" + + "tate\022\027.hbase.pb.MutateRequest\032\030.hbase.pb" + + ".MutateResponse\0225\n\004Scan\022\025.hbase.pb.ScanR" + + "equest\032\026.hbase.pb.ScanResponse\022P\n\rBulkLo" + + "adHFile\022\036.hbase.pb.BulkLoadHFileRequest\032" + + "\037.hbase.pb.BulkLoadHFileResponse\022V\n\017Prep" + + "areBulkLoad\022 .hbase.pb.PrepareBulkLoadRe" + + "quest\032!.hbase.pb.PrepareBulkLoadResponse", + "\022V\n\017CleanupBulkLoad\022 .hbase.pb.CleanupBu" + + "lkLoadRequest\032!.hbase.pb.CleanupBulkLoad" + + "Response\022X\n\013ExecService\022#.hbase.pb.Copro" + + "cessorServiceRequest\032$.hbase.pb.Coproces" + + "sorServiceResponse\022d\n\027ExecRegionServerSe" + + "rvice\022#.hbase.pb.CoprocessorServiceReque" + + "st\032$.hbase.pb.CoprocessorServiceResponse" + + "\0228\n\005Multi\022\026.hbase.pb.MultiRequest\032\027.hbas" + + "e.pb.MultiResponseBI\n1org.apache.hadoop." + + "hbase.shaded.protobuf.generatedB\014ClientP", + "rotosH\001\210\001\001\240\001\001" }; org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.FileDescriptor. InternalDescriptorAssigner() { @@ -40639,7 +40865,7 @@ public final class ClientProtos { internal_static_hbase_pb_Scan_fieldAccessorTable = new org.apache.hadoop.hbase.shaded.com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( internal_static_hbase_pb_Scan_descriptor, - new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "Small", "Reversed", "Consistency", "Caching", "AllowPartialResults", "CfTimeRange", }); + new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "Small", "Reversed", "Consistency", "Caching", "AllowPartialResults", "CfTimeRange", "MvccReadPoint", }); internal_static_hbase_pb_ScanRequest_descriptor = getDescriptor().getMessageTypes().get(12); internal_static_hbase_pb_ScanRequest_fieldAccessorTable = new @@ -40651,7 +40877,7 @@ public final class ClientProtos { internal_static_hbase_pb_ScanResponse_fieldAccessorTable = new org.apache.hadoop.hbase.shaded.com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( internal_static_hbase_pb_ScanResponse_descriptor, - new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", "Stale", "PartialFlagPerResult", "MoreResultsInRegion", "HeartbeatMessage", "ScanMetrics", }); + new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", "Stale", "PartialFlagPerResult", "MoreResultsInRegion", "HeartbeatMessage", "ScanMetrics", "MvccReadPoint", }); internal_static_hbase_pb_BulkLoadHFileRequest_descriptor = getDescriptor().getMessageTypes().get(14); internal_static_hbase_pb_BulkLoadHFileRequest_fieldAccessorTable = new diff --git a/hbase-protocol-shaded/src/main/protobuf/Client.proto b/hbase-protocol-shaded/src/main/protobuf/Client.proto index 2feaa26..d68886b 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Client.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Client.proto @@ -255,6 +255,7 @@ message Scan { optional uint32 caching = 17; optional bool allow_partial_results = 18; repeated ColumnFamilyTimeRange cf_time_range = 19; + optional uint64 mvcc_read_point = 20 [default = 0]; } /** @@ -328,6 +329,11 @@ message ScanResponse { // The metrics tracked here are sent back to the client to be tracked together with // the existing client side metrics. optional ScanMetrics scan_metrics = 10; + + // The mvcc read point which is used to open the scanner at server side. Client can + // make use of this mvcc_read_point when restarting a scanner to get a consistent view + // of a row. + optional uint64 mvcc_read_point = 11 [default = 0]; } /** diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java index dc050e8..c35617b 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java @@ -14219,6 +14219,16 @@ public final class ClientProtos { */ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ColumnFamilyTimeRangeOrBuilder getCfTimeRangeOrBuilder( int index); + + // optional uint64 mvcc_read_point = 20 [default = 0]; + /** + * optional uint64 mvcc_read_point = 20 [default = 0]; + */ + boolean hasMvccReadPoint(); + /** + * optional uint64 mvcc_read_point = 20 [default = 0]; + */ + long getMvccReadPoint(); } /** * Protobuf type {@code hbase.pb.Scan} @@ -14408,6 +14418,11 @@ public final class ClientProtos { cfTimeRange_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ColumnFamilyTimeRange.PARSER, extensionRegistry)); break; } + case 160: { + bitField0_ |= 0x00010000; + mvccReadPoint_ = input.readUInt64(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -14841,6 +14856,22 @@ public final class ClientProtos { return cfTimeRange_.get(index); } + // optional uint64 mvcc_read_point = 20 [default = 0]; + public static final int MVCC_READ_POINT_FIELD_NUMBER = 20; + private long mvccReadPoint_; + /** + * optional uint64 mvcc_read_point = 20 [default = 0]; + */ + public boolean hasMvccReadPoint() { + return ((bitField0_ & 0x00010000) == 0x00010000); + } + /** + * optional uint64 mvcc_read_point = 20 [default = 0]; + */ + public long getMvccReadPoint() { + return mvccReadPoint_; + } + private void initFields() { column_ = java.util.Collections.emptyList(); attribute_ = java.util.Collections.emptyList(); @@ -14861,6 +14892,7 @@ public final class ClientProtos { caching_ = 0; allowPartialResults_ = false; cfTimeRange_ = java.util.Collections.emptyList(); + mvccReadPoint_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -14955,6 +14987,9 @@ public final class ClientProtos { for (int i = 0; i < cfTimeRange_.size(); i++) { output.writeMessage(19, cfTimeRange_.get(i)); } + if (((bitField0_ & 0x00010000) == 0x00010000)) { + output.writeUInt64(20, mvccReadPoint_); + } getUnknownFields().writeTo(output); } @@ -15040,6 +15075,10 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeMessageSize(19, cfTimeRange_.get(i)); } + if (((bitField0_ & 0x00010000) == 0x00010000)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt64Size(20, mvccReadPoint_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -15149,6 +15188,11 @@ public final class ClientProtos { } result = result && getCfTimeRangeList() .equals(other.getCfTimeRangeList()); + result = result && (hasMvccReadPoint() == other.hasMvccReadPoint()); + if (hasMvccReadPoint()) { + result = result && (getMvccReadPoint() + == other.getMvccReadPoint()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -15238,6 +15282,10 @@ public final class ClientProtos { hash = (37 * hash) + CF_TIME_RANGE_FIELD_NUMBER; hash = (53 * hash) + getCfTimeRangeList().hashCode(); } + if (hasMvccReadPoint()) { + hash = (37 * hash) + MVCC_READ_POINT_FIELD_NUMBER; + hash = (53 * hash) + hashLong(getMvccReadPoint()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -15421,6 +15469,8 @@ public final class ClientProtos { } else { cfTimeRangeBuilder_.clear(); } + mvccReadPoint_ = 0L; + bitField0_ = (bitField0_ & ~0x00080000); return this; } @@ -15548,6 +15598,10 @@ public final class ClientProtos { } else { result.cfTimeRange_ = cfTimeRangeBuilder_.build(); } + if (((from_bitField0_ & 0x00080000) == 0x00080000)) { + to_bitField0_ |= 0x00010000; + } + result.mvccReadPoint_ = mvccReadPoint_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -15690,6 +15744,9 @@ public final class ClientProtos { } } } + if (other.hasMvccReadPoint()) { + setMvccReadPoint(other.getMvccReadPoint()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -17182,6 +17239,39 @@ public final class ClientProtos { return cfTimeRangeBuilder_; } + // optional uint64 mvcc_read_point = 20 [default = 0]; + private long mvccReadPoint_ ; + /** + * optional uint64 mvcc_read_point = 20 [default = 0]; + */ + public boolean hasMvccReadPoint() { + return ((bitField0_ & 0x00080000) == 0x00080000); + } + /** + * optional uint64 mvcc_read_point = 20 [default = 0]; + */ + public long getMvccReadPoint() { + return mvccReadPoint_; + } + /** + * optional uint64 mvcc_read_point = 20 [default = 0]; + */ + public Builder setMvccReadPoint(long value) { + bitField0_ |= 0x00080000; + mvccReadPoint_ = value; + onChanged(); + return this; + } + /** + * optional uint64 mvcc_read_point = 20 [default = 0]; + */ + public Builder clearMvccReadPoint() { + bitField0_ = (bitField0_ & ~0x00080000); + mvccReadPoint_ = 0L; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:hbase.pb.Scan) } @@ -18972,6 +19062,28 @@ public final class ClientProtos { *
*/ org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetricsOrBuilder getScanMetricsOrBuilder(); + + // optional uint64 mvcc_read_point = 11 [default = 0]; + /** + * optional uint64 mvcc_read_point = 11 [default = 0]; + * + *
+     * The mvcc read point which is used to open the scanner at server side. Client can
+     * make use of this mvcc_read_point when restarting a scanner to get a consistent view
+     * of a row.
+     * 
+ */ + boolean hasMvccReadPoint(); + /** + * optional uint64 mvcc_read_point = 11 [default = 0]; + * + *
+     * The mvcc read point which is used to open the scanner at server side. Client can
+     * make use of this mvcc_read_point when restarting a scanner to get a consistent view
+     * of a row.
+     * 
+ */ + long getMvccReadPoint(); } /** * Protobuf type {@code hbase.pb.ScanResponse} @@ -19123,6 +19235,11 @@ public final class ClientProtos { bitField0_ |= 0x00000040; break; } + case 88: { + bitField0_ |= 0x00000080; + mvccReadPoint_ = input.readUInt64(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -19506,6 +19623,34 @@ public final class ClientProtos { return scanMetrics_; } + // optional uint64 mvcc_read_point = 11 [default = 0]; + public static final int MVCC_READ_POINT_FIELD_NUMBER = 11; + private long mvccReadPoint_; + /** + * optional uint64 mvcc_read_point = 11 [default = 0]; + * + *
+     * The mvcc read point which is used to open the scanner at server side. Client can
+     * make use of this mvcc_read_point when restarting a scanner to get a consistent view
+     * of a row.
+     * 
+ */ + public boolean hasMvccReadPoint() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + /** + * optional uint64 mvcc_read_point = 11 [default = 0]; + * + *
+     * The mvcc read point which is used to open the scanner at server side. Client can
+     * make use of this mvcc_read_point when restarting a scanner to get a consistent view
+     * of a row.
+     * 
+ */ + public long getMvccReadPoint() { + return mvccReadPoint_; + } + private void initFields() { cellsPerResult_ = java.util.Collections.emptyList(); scannerId_ = 0L; @@ -19517,6 +19662,7 @@ public final class ClientProtos { moreResultsInRegion_ = false; heartbeatMessage_ = false; scanMetrics_ = org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.getDefaultInstance(); + mvccReadPoint_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -19560,6 +19706,9 @@ public final class ClientProtos { if (((bitField0_ & 0x00000040) == 0x00000040)) { output.writeMessage(10, scanMetrics_); } + if (((bitField0_ & 0x00000080) == 0x00000080)) { + output.writeUInt64(11, mvccReadPoint_); + } getUnknownFields().writeTo(output); } @@ -19616,6 +19765,10 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeMessageSize(10, scanMetrics_); } + if (((bitField0_ & 0x00000080) == 0x00000080)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt64Size(11, mvccReadPoint_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -19680,6 +19833,11 @@ public final class ClientProtos { result = result && getScanMetrics() .equals(other.getScanMetrics()); } + result = result && (hasMvccReadPoint() == other.hasMvccReadPoint()); + if (hasMvccReadPoint()) { + result = result && (getMvccReadPoint() + == other.getMvccReadPoint()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -19733,6 +19891,10 @@ public final class ClientProtos { hash = (37 * hash) + SCAN_METRICS_FIELD_NUMBER; hash = (53 * hash) + getScanMetrics().hashCode(); } + if (hasMvccReadPoint()) { + hash = (37 * hash) + MVCC_READ_POINT_FIELD_NUMBER; + hash = (53 * hash) + hashLong(getMvccReadPoint()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -19878,6 +20040,8 @@ public final class ClientProtos { scanMetricsBuilder_.clear(); } bitField0_ = (bitField0_ & ~0x00000200); + mvccReadPoint_ = 0L; + bitField0_ = (bitField0_ & ~0x00000400); return this; } @@ -19957,6 +20121,10 @@ public final class ClientProtos { } else { result.scanMetrics_ = scanMetricsBuilder_.build(); } + if (((from_bitField0_ & 0x00000400) == 0x00000400)) { + to_bitField0_ |= 0x00000080; + } + result.mvccReadPoint_ = mvccReadPoint_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -20040,6 +20208,9 @@ public final class ClientProtos { if (other.hasScanMetrics()) { mergeScanMetrics(other.getScanMetrics()); } + if (other.hasMvccReadPoint()) { + setMvccReadPoint(other.getMvccReadPoint()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -21108,6 +21279,63 @@ public final class ClientProtos { return scanMetricsBuilder_; } + // optional uint64 mvcc_read_point = 11 [default = 0]; + private long mvccReadPoint_ ; + /** + * optional uint64 mvcc_read_point = 11 [default = 0]; + * + *
+       * The mvcc read point which is used to open the scanner at server side. Client can
+       * make use of this mvcc_read_point when restarting a scanner to get a consistent view
+       * of a row.
+       * 
+ */ + public boolean hasMvccReadPoint() { + return ((bitField0_ & 0x00000400) == 0x00000400); + } + /** + * optional uint64 mvcc_read_point = 11 [default = 0]; + * + *
+       * The mvcc read point which is used to open the scanner at server side. Client can
+       * make use of this mvcc_read_point when restarting a scanner to get a consistent view
+       * of a row.
+       * 
+ */ + public long getMvccReadPoint() { + return mvccReadPoint_; + } + /** + * optional uint64 mvcc_read_point = 11 [default = 0]; + * + *
+       * The mvcc read point which is used to open the scanner at server side. Client can
+       * make use of this mvcc_read_point when restarting a scanner to get a consistent view
+       * of a row.
+       * 
+ */ + public Builder setMvccReadPoint(long value) { + bitField0_ |= 0x00000400; + mvccReadPoint_ = value; + onChanged(); + return this; + } + /** + * optional uint64 mvcc_read_point = 11 [default = 0]; + * + *
+       * The mvcc read point which is used to open the scanner at server side. Client can
+       * make use of this mvcc_read_point when restarting a scanner to get a consistent view
+       * of a row.
+       * 
+ */ + public Builder clearMvccReadPoint() { + bitField0_ = (bitField0_ & ~0x00000400); + mvccReadPoint_ = 0L; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:hbase.pb.ScanResponse) } @@ -39504,7 +39732,7 @@ public final class ClientProtos { "tion\030\003 \001(\0132\023.hbase.pb.Condition\022\023\n\013nonce" + "_group\030\004 \001(\004\"E\n\016MutateResponse\022 \n\006result" + "\030\001 \001(\0132\020.hbase.pb.Result\022\021\n\tprocessed\030\002 " + - "\001(\010\"\275\004\n\004Scan\022 \n\006column\030\001 \003(\0132\020.hbase.pb." + + "\001(\010\"\331\004\n\004Scan\022 \n\006column\030\001 \003(\0132\020.hbase.pb." + "Column\022*\n\tattribute\030\002 \003(\0132\027.hbase.pb.Nam" + "eBytesPair\022\021\n\tstart_row\030\003 \001(\014\022\020\n\010stop_ro" + "w\030\004 \001(\014\022 \n\006filter\030\005 \001(\0132\020.hbase.pb.Filte" + @@ -39518,96 +39746,98 @@ public final class ClientProtos { " \001(\0162\025.hbase.pb.Consistency:\006STRONG\022\017\n\007c" + "aching\030\021 \001(\r\022\035\n\025allow_partial_results\030\022 " + "\001(\010\0226\n\rcf_time_range\030\023 \003(\0132\037.hbase.pb.Co" + - "lumnFamilyTimeRange\"\246\002\n\013ScanRequest\022)\n\006r", - "egion\030\001 \001(\0132\031.hbase.pb.RegionSpecifier\022\034" + - "\n\004scan\030\002 \001(\0132\016.hbase.pb.Scan\022\022\n\nscanner_" + - "id\030\003 \001(\004\022\026\n\016number_of_rows\030\004 \001(\r\022\025\n\rclos" + - "e_scanner\030\005 \001(\010\022\025\n\rnext_call_seq\030\006 \001(\004\022\037" + - "\n\027client_handles_partials\030\007 \001(\010\022!\n\031clien" + - "t_handles_heartbeats\030\010 \001(\010\022\032\n\022track_scan" + - "_metrics\030\t \001(\010\022\024\n\005renew\030\n \001(\010:\005false\"\232\002\n" + - "\014ScanResponse\022\030\n\020cells_per_result\030\001 \003(\r\022" + - "\022\n\nscanner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(" + - "\010\022\013\n\003ttl\030\004 \001(\r\022!\n\007results\030\005 \003(\0132\020.hbase.", - "pb.Result\022\r\n\005stale\030\006 \001(\010\022\037\n\027partial_flag" + - "_per_result\030\007 \003(\010\022\036\n\026more_results_in_reg" + - "ion\030\010 \001(\010\022\031\n\021heartbeat_message\030\t \001(\010\022+\n\014" + - "scan_metrics\030\n \001(\0132\025.hbase.pb.ScanMetric" + - "s\"\240\002\n\024BulkLoadHFileRequest\022)\n\006region\030\001 \002" + - "(\0132\031.hbase.pb.RegionSpecifier\022>\n\013family_" + - "path\030\002 \003(\0132).hbase.pb.BulkLoadHFileReque" + - "st.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\022+\n" + - "\010fs_token\030\004 \001(\0132\031.hbase.pb.DelegationTok" + - "en\022\022\n\nbulk_token\030\005 \001(\t\022\030\n\tcopy_file\030\006 \001(", - "\010:\005false\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014" + - "\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022\016\n" + - "\006loaded\030\001 \002(\010\"V\n\017DelegationToken\022\022\n\niden" + - "tifier\030\001 \001(\014\022\020\n\010password\030\002 \001(\014\022\014\n\004kind\030\003" + - " \001(\t\022\017\n\007service\030\004 \001(\t\"l\n\026PrepareBulkLoad" + - "Request\022\'\n\ntable_name\030\001 \002(\0132\023.hbase.pb.T" + - "ableName\022)\n\006region\030\002 \001(\0132\031.hbase.pb.Regi" + - "onSpecifier\"-\n\027PrepareBulkLoadResponse\022\022" + - "\n\nbulk_token\030\001 \002(\t\"W\n\026CleanupBulkLoadReq" + - "uest\022\022\n\nbulk_token\030\001 \002(\t\022)\n\006region\030\002 \001(\013", - "2\031.hbase.pb.RegionSpecifier\"\031\n\027CleanupBu" + - "lkLoadResponse\"a\n\026CoprocessorServiceCall" + - "\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n\013m" + - "ethod_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"B\n\030Cop" + - "rocessorServiceResult\022&\n\005value\030\001 \001(\0132\027.h" + - "base.pb.NameBytesPair\"v\n\031CoprocessorServ" + - "iceRequest\022)\n\006region\030\001 \002(\0132\031.hbase.pb.Re" + - "gionSpecifier\022.\n\004call\030\002 \002(\0132 .hbase.pb.C" + - "oprocessorServiceCall\"o\n\032CoprocessorServ" + - "iceResponse\022)\n\006region\030\001 \002(\0132\031.hbase.pb.R", - "egionSpecifier\022&\n\005value\030\002 \002(\0132\027.hbase.pb" + - ".NameBytesPair\"\226\001\n\006Action\022\r\n\005index\030\001 \001(\r" + - "\022)\n\010mutation\030\002 \001(\0132\027.hbase.pb.MutationPr" + - "oto\022\032\n\003get\030\003 \001(\0132\r.hbase.pb.Get\0226\n\014servi" + - "ce_call\030\004 \001(\0132 .hbase.pb.CoprocessorServ" + - "iceCall\"k\n\014RegionAction\022)\n\006region\030\001 \002(\0132" + - "\031.hbase.pb.RegionSpecifier\022\016\n\006atomic\030\002 \001" + - "(\010\022 \n\006action\030\003 \003(\0132\020.hbase.pb.Action\"c\n\017" + - "RegionLoadStats\022\027\n\014memstoreLoad\030\001 \001(\005:\0010" + - "\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\022\035\n\022compaction", - "Pressure\030\003 \001(\005:\0010\"j\n\024MultiRegionLoadStat" + - "s\022)\n\006region\030\001 \003(\0132\031.hbase.pb.RegionSpeci" + - "fier\022\'\n\004stat\030\002 \003(\0132\031.hbase.pb.RegionLoad" + - "Stats\"\336\001\n\021ResultOrException\022\r\n\005index\030\001 \001" + - "(\r\022 \n\006result\030\002 \001(\0132\020.hbase.pb.Result\022*\n\t" + - "exception\030\003 \001(\0132\027.hbase.pb.NameBytesPair" + - "\022:\n\016service_result\030\004 \001(\0132\".hbase.pb.Copr" + - "ocessorServiceResult\0220\n\tloadStats\030\005 \001(\0132" + - "\031.hbase.pb.RegionLoadStatsB\002\030\001\"x\n\022Region" + - "ActionResult\0226\n\021resultOrException\030\001 \003(\0132", - "\033.hbase.pb.ResultOrException\022*\n\texceptio" + - "n\030\002 \001(\0132\027.hbase.pb.NameBytesPair\"x\n\014Mult" + - "iRequest\022,\n\014regionAction\030\001 \003(\0132\026.hbase.p" + - "b.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022&\n\tco" + - "ndition\030\003 \001(\0132\023.hbase.pb.Condition\"\226\001\n\rM" + - "ultiResponse\0228\n\022regionActionResult\030\001 \003(\013" + - "2\034.hbase.pb.RegionActionResult\022\021\n\tproces" + - "sed\030\002 \001(\010\0228\n\020regionStatistics\030\003 \001(\0132\036.hb" + - "ase.pb.MultiRegionLoadStats*\'\n\013Consisten" + - "cy\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\263\005\n\rClientS", - "ervice\0222\n\003Get\022\024.hbase.pb.GetRequest\032\025.hb" + - "ase.pb.GetResponse\022;\n\006Mutate\022\027.hbase.pb." + - "MutateRequest\032\030.hbase.pb.MutateResponse\022" + - "5\n\004Scan\022\025.hbase.pb.ScanRequest\032\026.hbase.p" + - "b.ScanResponse\022P\n\rBulkLoadHFile\022\036.hbase." + - "pb.BulkLoadHFileRequest\032\037.hbase.pb.BulkL" + - "oadHFileResponse\022V\n\017PrepareBulkLoad\022 .hb" + - "ase.pb.PrepareBulkLoadRequest\032!.hbase.pb" + - ".PrepareBulkLoadResponse\022V\n\017CleanupBulkL" + - "oad\022 .hbase.pb.CleanupBulkLoadRequest\032!.", - "hbase.pb.CleanupBulkLoadResponse\022X\n\013Exec" + - "Service\022#.hbase.pb.CoprocessorServiceReq" + - "uest\032$.hbase.pb.CoprocessorServiceRespon" + - "se\022d\n\027ExecRegionServerService\022#.hbase.pb" + - ".CoprocessorServiceRequest\032$.hbase.pb.Co" + - "processorServiceResponse\0228\n\005Multi\022\026.hbas" + - "e.pb.MultiRequest\032\027.hbase.pb.MultiRespon" + - "seBB\n*org.apache.hadoop.hbase.protobuf.g" + - "eneratedB\014ClientProtosH\001\210\001\001\240\001\001" + "lumnFamilyTimeRange\022\032\n\017mvcc_read_point\030\024", + " \001(\004:\0010\"\246\002\n\013ScanRequest\022)\n\006region\030\001 \001(\0132" + + "\031.hbase.pb.RegionSpecifier\022\034\n\004scan\030\002 \001(\013" + + "2\016.hbase.pb.Scan\022\022\n\nscanner_id\030\003 \001(\004\022\026\n\016" + + "number_of_rows\030\004 \001(\r\022\025\n\rclose_scanner\030\005 " + + "\001(\010\022\025\n\rnext_call_seq\030\006 \001(\004\022\037\n\027client_han" + + "dles_partials\030\007 \001(\010\022!\n\031client_handles_he" + + "artbeats\030\010 \001(\010\022\032\n\022track_scan_metrics\030\t \001" + + "(\010\022\024\n\005renew\030\n \001(\010:\005false\"\266\002\n\014ScanRespons" + + "e\022\030\n\020cells_per_result\030\001 \003(\r\022\022\n\nscanner_i" + + "d\030\002 \001(\004\022\024\n\014more_results\030\003 \001(\010\022\013\n\003ttl\030\004 \001", + "(\r\022!\n\007results\030\005 \003(\0132\020.hbase.pb.Result\022\r\n" + + "\005stale\030\006 \001(\010\022\037\n\027partial_flag_per_result\030" + + "\007 \003(\010\022\036\n\026more_results_in_region\030\010 \001(\010\022\031\n" + + "\021heartbeat_message\030\t \001(\010\022+\n\014scan_metrics" + + "\030\n \001(\0132\025.hbase.pb.ScanMetrics\022\032\n\017mvcc_re" + + "ad_point\030\013 \001(\004:\0010\"\240\002\n\024BulkLoadHFileReque" + + "st\022)\n\006region\030\001 \002(\0132\031.hbase.pb.RegionSpec" + + "ifier\022>\n\013family_path\030\002 \003(\0132).hbase.pb.Bu" + + "lkLoadHFileRequest.FamilyPath\022\026\n\016assign_" + + "seq_num\030\003 \001(\010\022+\n\010fs_token\030\004 \001(\0132\031.hbase.", + "pb.DelegationToken\022\022\n\nbulk_token\030\005 \001(\t\022\030" + + "\n\tcopy_file\030\006 \001(\010:\005false\032*\n\nFamilyPath\022\016" + + "\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoad" + + "HFileResponse\022\016\n\006loaded\030\001 \002(\010\"V\n\017Delegat" + + "ionToken\022\022\n\nidentifier\030\001 \001(\014\022\020\n\010password" + + "\030\002 \001(\014\022\014\n\004kind\030\003 \001(\t\022\017\n\007service\030\004 \001(\t\"l\n" + + "\026PrepareBulkLoadRequest\022\'\n\ntable_name\030\001 " + + "\002(\0132\023.hbase.pb.TableName\022)\n\006region\030\002 \001(\013" + + "2\031.hbase.pb.RegionSpecifier\"-\n\027PrepareBu" + + "lkLoadResponse\022\022\n\nbulk_token\030\001 \002(\t\"W\n\026Cl", + "eanupBulkLoadRequest\022\022\n\nbulk_token\030\001 \002(\t" + + "\022)\n\006region\030\002 \001(\0132\031.hbase.pb.RegionSpecif" + + "ier\"\031\n\027CleanupBulkLoadResponse\"a\n\026Coproc" + + "essorServiceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service" + + "_name\030\002 \002(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007requ" + + "est\030\004 \002(\014\"B\n\030CoprocessorServiceResult\022&\n" + + "\005value\030\001 \001(\0132\027.hbase.pb.NameBytesPair\"v\n" + + "\031CoprocessorServiceRequest\022)\n\006region\030\001 \002" + + "(\0132\031.hbase.pb.RegionSpecifier\022.\n\004call\030\002 " + + "\002(\0132 .hbase.pb.CoprocessorServiceCall\"o\n", + "\032CoprocessorServiceResponse\022)\n\006region\030\001 " + + "\002(\0132\031.hbase.pb.RegionSpecifier\022&\n\005value\030" + + "\002 \002(\0132\027.hbase.pb.NameBytesPair\"\226\001\n\006Actio" + + "n\022\r\n\005index\030\001 \001(\r\022)\n\010mutation\030\002 \001(\0132\027.hba" + + "se.pb.MutationProto\022\032\n\003get\030\003 \001(\0132\r.hbase" + + ".pb.Get\0226\n\014service_call\030\004 \001(\0132 .hbase.pb" + + ".CoprocessorServiceCall\"k\n\014RegionAction\022" + + ")\n\006region\030\001 \002(\0132\031.hbase.pb.RegionSpecifi" + + "er\022\016\n\006atomic\030\002 \001(\010\022 \n\006action\030\003 \003(\0132\020.hba" + + "se.pb.Action\"c\n\017RegionLoadStats\022\027\n\014memst", + "oreLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy\030\002 \001(\005:" + + "\0010\022\035\n\022compactionPressure\030\003 \001(\005:\0010\"j\n\024Mul" + + "tiRegionLoadStats\022)\n\006region\030\001 \003(\0132\031.hbas" + + "e.pb.RegionSpecifier\022\'\n\004stat\030\002 \003(\0132\031.hba" + + "se.pb.RegionLoadStats\"\336\001\n\021ResultOrExcept" + + "ion\022\r\n\005index\030\001 \001(\r\022 \n\006result\030\002 \001(\0132\020.hba" + + "se.pb.Result\022*\n\texception\030\003 \001(\0132\027.hbase." + + "pb.NameBytesPair\022:\n\016service_result\030\004 \001(\013" + + "2\".hbase.pb.CoprocessorServiceResult\0220\n\t" + + "loadStats\030\005 \001(\0132\031.hbase.pb.RegionLoadSta", + "tsB\002\030\001\"x\n\022RegionActionResult\0226\n\021resultOr" + + "Exception\030\001 \003(\0132\033.hbase.pb.ResultOrExcep" + + "tion\022*\n\texception\030\002 \001(\0132\027.hbase.pb.NameB" + + "ytesPair\"x\n\014MultiRequest\022,\n\014regionAction" + + "\030\001 \003(\0132\026.hbase.pb.RegionAction\022\022\n\nnonceG" + + "roup\030\002 \001(\004\022&\n\tcondition\030\003 \001(\0132\023.hbase.pb" + + ".Condition\"\226\001\n\rMultiResponse\0228\n\022regionAc" + + "tionResult\030\001 \003(\0132\034.hbase.pb.RegionAction" + + "Result\022\021\n\tprocessed\030\002 \001(\010\0228\n\020regionStati" + + "stics\030\003 \001(\0132\036.hbase.pb.MultiRegionLoadSt", + "ats*\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010TIMELI" + + "NE\020\0012\263\005\n\rClientService\0222\n\003Get\022\024.hbase.pb" + + ".GetRequest\032\025.hbase.pb.GetResponse\022;\n\006Mu" + + "tate\022\027.hbase.pb.MutateRequest\032\030.hbase.pb" + + ".MutateResponse\0225\n\004Scan\022\025.hbase.pb.ScanR" + + "equest\032\026.hbase.pb.ScanResponse\022P\n\rBulkLo" + + "adHFile\022\036.hbase.pb.BulkLoadHFileRequest\032" + + "\037.hbase.pb.BulkLoadHFileResponse\022V\n\017Prep" + + "areBulkLoad\022 .hbase.pb.PrepareBulkLoadRe" + + "quest\032!.hbase.pb.PrepareBulkLoadResponse", + "\022V\n\017CleanupBulkLoad\022 .hbase.pb.CleanupBu" + + "lkLoadRequest\032!.hbase.pb.CleanupBulkLoad" + + "Response\022X\n\013ExecService\022#.hbase.pb.Copro" + + "cessorServiceRequest\032$.hbase.pb.Coproces" + + "sorServiceResponse\022d\n\027ExecRegionServerSe" + + "rvice\022#.hbase.pb.CoprocessorServiceReque" + + "st\032$.hbase.pb.CoprocessorServiceResponse" + + "\0228\n\005Multi\022\026.hbase.pb.MultiRequest\032\027.hbas" + + "e.pb.MultiResponseBB\n*org.apache.hadoop." + + "hbase.protobuf.generatedB\014ClientProtosH\001", + "\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -39697,7 +39927,7 @@ public final class ClientProtos { internal_static_hbase_pb_Scan_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_hbase_pb_Scan_descriptor, - new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "Small", "Reversed", "Consistency", "Caching", "AllowPartialResults", "CfTimeRange", }); + new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "Small", "Reversed", "Consistency", "Caching", "AllowPartialResults", "CfTimeRange", "MvccReadPoint", }); internal_static_hbase_pb_ScanRequest_descriptor = getDescriptor().getMessageTypes().get(12); internal_static_hbase_pb_ScanRequest_fieldAccessorTable = new @@ -39709,7 +39939,7 @@ public final class ClientProtos { internal_static_hbase_pb_ScanResponse_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_hbase_pb_ScanResponse_descriptor, - new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", "Stale", "PartialFlagPerResult", "MoreResultsInRegion", "HeartbeatMessage", "ScanMetrics", }); + new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", "Stale", "PartialFlagPerResult", "MoreResultsInRegion", "HeartbeatMessage", "ScanMetrics", "MvccReadPoint", }); internal_static_hbase_pb_BulkLoadHFileRequest_descriptor = getDescriptor().getMessageTypes().get(14); internal_static_hbase_pb_BulkLoadHFileRequest_fieldAccessorTable = new diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto index 237b932..519d7ae 100644 --- a/hbase-protocol/src/main/protobuf/Client.proto +++ b/hbase-protocol/src/main/protobuf/Client.proto @@ -255,6 +255,7 @@ message Scan { optional uint32 caching = 17; optional bool allow_partial_results = 18; repeated ColumnFamilyTimeRange cf_time_range = 19; + optional uint64 mvcc_read_point = 20 [default = 0]; } /** @@ -328,6 +329,11 @@ message ScanResponse { // The metrics tracked here are sent back to the client to be tracked together with // the existing client side metrics. optional ScanMetrics scan_metrics = 10; + + // The mvcc read point which is used to open the scanner at server side. Client can + // make use of this mvcc_read_point when restarting a scanner to get a consistent view + // of a row. + optional uint64 mvcc_read_point = 11 [default = 0]; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index aa19342..19e1235 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -111,6 +111,7 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.Result; @@ -5739,8 +5740,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // synchronize on scannerReadPoints so that nobody calculates // getSmallestReadPoint, before scannerReadPoints is updated. IsolationLevel isolationLevel = scan.getIsolationLevel(); - synchronized(scannerReadPoints) { - if (nonce == HConstants.NO_NONCE || rsServices == null + long mvccReadPoint = PackagePrivateFieldAccessor.getMvccReadPoint(scan); + synchronized (scannerReadPoints) { + if (mvccReadPoint > 0) { + this.readPt = mvccReadPoint; + } else if (nonce == HConstants.NO_NONCE || rsServices == null || rsServices.getNonceManager() == null) { this.readPt = getReadPoint(isolationLevel); } else { @@ -5748,7 +5752,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } scannerReadPoints.put(this, this.readPt); } - initializeScanners(scan, additionalScanners); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index a61a9f2..85b7967 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -2749,8 +2749,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, scannerName = String.valueOf(scannerId); rsh = addScanner(scannerName, scanner, region); ttl = this.scannerLeaseTimeoutPeriod; + builder.setMvccReadPoint(scanner.getMvccReadPoint()); } - assert scanner != null; if (request.hasRenew() && request.getRenew()) { rsh = scanners.get(scannerName); lease = regionServer.leases.removeLease(scannerName); @@ -2868,7 +2868,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } else { throw new UnsupportedOperationException("We only do " + - "PayloadCarryingRpcControllers! FIX IF A PROBLEM: " + controller); + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller); } } // Use half of whichever timeout value was more restrictive... But don't allow diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java index 5c4ca13..b0a6137 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java @@ -734,12 +734,9 @@ public class TestPartialResultsFromClientSide { Table tmpTable = createTestTable(testName, rows, families, qualifiers, value); - Scan scan = new Scan(); - scan.setMaxResultSize(1); - scan.setAllowPartialResults(true); - // Open scanner before deletes - ResultScanner scanner = tmpTable.getScanner(scan); + ResultScanner scanner = + tmpTable.getScanner(new Scan().setMaxResultSize(1).setAllowPartialResults(true)); Delete delete1 = new Delete(rows[0]); delete1.addColumn(families[0], qualifiers[0], 0); @@ -756,13 +753,13 @@ public class TestPartialResultsFromClientSide { scannerCount == expectedCount); // Minus 2 for the two cells that were deleted - scanner = tmpTable.getScanner(scan); + scanner = tmpTable.getScanner(new Scan().setMaxResultSize(1).setAllowPartialResults(true)); scannerCount = countCellsFromScanner(scanner); expectedCount = numRows * numFamilies * numQualifiers - 2; assertTrue("scannerCount: " + scannerCount + " expectedCount: " + expectedCount, scannerCount == expectedCount); - scanner = tmpTable.getScanner(scan); + scanner = tmpTable.getScanner(new Scan().setMaxResultSize(1).setAllowPartialResults(true)); // Put in 2 new rows. The timestamps differ from the deleted rows Put put1 = new Put(rows[0]); put1.add(new KeyValue(rows[0], families[0], qualifiers[0], 1, value)); @@ -779,7 +776,7 @@ public class TestPartialResultsFromClientSide { scannerCount == expectedCount); // Now the scanner should see the cells that were added by puts - scanner = tmpTable.getScanner(scan); + scanner = tmpTable.getScanner(new Scan().setMaxResultSize(1).setAllowPartialResults(true)); scannerCount = countCellsFromScanner(scanner); expectedCount = numRows * numFamilies * numQualifiers; assertTrue("scannerCount: " + scannerCount + " expectedCount: " + expectedCount, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMvccConsistentScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMvccConsistentScanner.java new file mode 100644 index 0000000..eccb1f7 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMvccConsistentScanner.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({ LargeTests.class, ClientTests.class }) +public class TestMvccConsistentScanner { + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static Connection CONN; + + private static final byte[] CF = Bytes.toBytes("cf"); + + private static final byte[] CQ1 = Bytes.toBytes("cq1"); + + private static final byte[] CQ2 = Bytes.toBytes("cq2"); + + private static final byte[] CQ3 = Bytes.toBytes("cq3"); + @Rule + public TestName testName = new TestName(); + + private TableName tableName; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + UTIL.startMiniCluster(2); + CONN = ConnectionFactory.createConnection(UTIL.getConfiguration()); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + CONN.close(); + UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws IOException, InterruptedException { + tableName = TableName.valueOf(testName.getMethodName().replaceAll("[^0-9a-zA-Z]", "_")); + UTIL.createTable(tableName, CF); + UTIL.waitTableAvailable(tableName); + } + + private void put(byte[] row, byte[] cq, byte[] value) throws IOException { + try (Table table = CONN.getTable(tableName)) { + table.put(new Put(row).addColumn(CF, cq, value)); + } + } + + private void move() throws IOException, InterruptedException { + HRegionInfo region = + UTIL.getHBaseCluster().getRegions(tableName).stream().findAny().get().getRegionInfo(); + HRegionServer rs = + UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer()) + .filter(r -> !r.getOnlineTables().contains(tableName)).findAny().get(); + UTIL.getAdmin().move(region.getEncodedNameAsBytes(), + Bytes.toBytes(rs.getServerName().getServerName())); + while (UTIL.getRSForFirstRegionInTable(tableName) != rs) { + Thread.sleep(100); + } + } + + @Test + public void testRowAtomic() throws IOException, InterruptedException { + byte[] row = Bytes.toBytes("row"); + put(row, CQ1, Bytes.toBytes(1)); + put(row, CQ2, Bytes.toBytes(2)); + try (Table table = CONN.getTable(tableName); + ResultScanner scanner = table.getScanner(new Scan().setBatch(1).setCaching(1))) { + Result result = scanner.next(); + assertEquals(1, result.rawCells().length); + assertEquals(1, Bytes.toInt(result.getValue(CF, CQ1))); + move(); + put(row, CQ3, Bytes.toBytes(3)); + result = scanner.next(); + assertEquals(1, result.rawCells().length); + assertEquals(2, Bytes.toInt(result.getValue(CF, CQ2))); + assertNull(scanner.next()); + } + } + + @Test + public void testCrossRowAtomicInRegion() throws IOException, InterruptedException { + put(Bytes.toBytes("row1"), CQ1, Bytes.toBytes(1)); + put(Bytes.toBytes("row2"), CQ1, Bytes.toBytes(2)); + try (Table table = CONN.getTable(tableName); + ResultScanner scanner = table.getScanner(new Scan().setCaching(1))) { + Result result = scanner.next(); + assertArrayEquals(Bytes.toBytes("row1"), result.getRow()); + assertEquals(1, Bytes.toInt(result.getValue(CF, CQ1))); + move(); + put(Bytes.toBytes("row3"), CQ1, Bytes.toBytes(3)); + result = scanner.next(); + assertArrayEquals(Bytes.toBytes("row2"), result.getRow()); + assertEquals(2, Bytes.toInt(result.getValue(CF, CQ1))); + assertNull(scanner.next()); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java index 06dbc37..f8107d6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java @@ -440,10 +440,8 @@ public class TestTags { increment.add(new KeyValue(row2, f, q, 1234L, v)); increment.setAttribute("visibility", Bytes.toBytes("tag2")); table.increment(increment); - Scan scan = new Scan(); - scan.setStartRow(row2); TestCoprocessorForTags.checkTagPresence = true; - scanner = table.getScanner(scan); + scanner = table.getScanner(new Scan().setStartRow(row2)); result = scanner.next(); kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q)); tags = TestCoprocessorForTags.tags; @@ -462,10 +460,8 @@ public class TestTags { Append append = new Append(row3); append.add(f, q, Bytes.toBytes("b")); table.append(append); - scan = new Scan(); - scan.setStartRow(row3); TestCoprocessorForTags.checkTagPresence = true; - scanner = table.getScanner(scan); + scanner = table.getScanner(new Scan().setStartRow(row3)); result = scanner.next(); kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q)); tags = TestCoprocessorForTags.tags; @@ -479,7 +475,7 @@ public class TestTags { append.setAttribute("visibility", Bytes.toBytes("tag2")); table.append(append); TestCoprocessorForTags.checkTagPresence = true; - scanner = table.getScanner(scan); + scanner = table.getScanner(new Scan().setStartRow(row3)); result = scanner.next(); kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q)); tags = TestCoprocessorForTags.tags; @@ -502,10 +498,8 @@ public class TestTags { append.add(new KeyValue(row4, f, q, 1234L, v)); append.setAttribute("visibility", Bytes.toBytes("tag2")); table.append(append); - scan = new Scan(); - scan.setStartRow(row4); TestCoprocessorForTags.checkTagPresence = true; - scanner = table.getScanner(scan); + scanner = table.getScanner(new Scan().setStartRow(row4)); result = scanner.next(); kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q)); tags = TestCoprocessorForTags.tags; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java index 62a94f1..4ae14b7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java @@ -332,22 +332,16 @@ public class TestReplicationSink { } entries.add(builder.build()); - ResultScanner scanRes = null; - try { - Scan scan = new Scan(); - scanRes = table1.getScanner(scan); + try (ResultScanner scanner = table1.getScanner(new Scan())) { // 6. Assert no existing data in table - assertEquals(0, scanRes.next(numRows).length); - // 7. Replicate the bulk loaded entry - SINK.replicateEntries(entries, CellUtil.createCellScanner(edit.getCells().iterator()), - replicationClusterId, baseNamespaceDir, hfileArchiveDir); - scanRes = table1.getScanner(scan); + assertEquals(0, scanner.next(numRows).length); + } + // 7. Replicate the bulk loaded entry + SINK.replicateEntries(entries, CellUtil.createCellScanner(edit.getCells().iterator()), + replicationClusterId, baseNamespaceDir, hfileArchiveDir); + try (ResultScanner scanner = table1.getScanner(new Scan())) { // 8. Assert data is replicated - assertEquals(numRows, scanRes.next(numRows).length); - } finally { - if (scanRes != null) { - scanRes.close(); - } + assertEquals(numRows, scanner.next(numRows).length); } } -- 1.9.1