From 221fe01b174bf4df5755b9716e43608c28da9f3a Mon Sep 17 00:00:00 2001 From: Phil Yang Date: Fri, 26 May 2017 19:09:01 +0800 Subject: [PATCH] HBASE-18122 Scanner id should include ServerName of region server --- .../hadoop/hbase/regionserver/RSRpcServices.java | 5 ++- .../hbase/regionserver/ScannerIdGenerator.java | 51 ++++++++++++++++++++++ 2 files changed, 54 insertions(+), 2 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerIdGenerator.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index b3ca94d2de..2d3ca9801b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -263,7 +263,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // The reference to the priority extraction function private final PriorityFunction priority; - private final AtomicLong scannerIdGen = new AtomicLong(0L); + private ScannerIdGenerator scannerIdGenerator; private final ConcurrentMap scanners = new ConcurrentHashMap<>(); /** @@ -1168,6 +1168,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort()); rpcServer.setErrorHandler(this); rs.setName(name); + this.scannerIdGenerator = new ScannerIdGenerator(rs.serverName); } @Override @@ -2854,7 +2855,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (region.getCoprocessorHost() != null) { scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner); } - long scannerId = this.scannerIdGen.incrementAndGet(); + long scannerId = scannerIdGenerator.generateNewScannerId(); builder.setScannerId(scannerId); builder.setMvccReadPoint(scanner.getMvccReadPoint()); builder.setTtl(scannerLeaseTimeoutPeriod); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerIdGenerator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerIdGenerator.java new file mode 100644 index 0000000000..fe719b1cab --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerIdGenerator.java @@ -0,0 +1,51 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import com.google.common.hash.Hashing; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Generate a new style scanner id to prevent collision with previous started server or other RSs. + * We have 64 bits to use. + * The first 32 bits are MurmurHash32 of ServerName string "host:port,ts". + * The ServerName contains both host, port, and start timestamp so it can prevent collision. + * The lowest 32bit is generated by atomic int. + */ +@InterfaceAudience.Private +public class ScannerIdGenerator { + + private long serverNameHash; + private AtomicInteger scannerIdGen = new AtomicInteger(0); + + public ScannerIdGenerator(ServerName serverName) { + this.serverNameHash = (long)Hashing.murmur3_32().hashString(serverName.toString()).asInt() << 32; + } + + public long generateNewScannerId() { + return (scannerIdGen.incrementAndGet() & 0x00000000FFFFFFFFL) | serverNameHash; + } + +} -- 2.11.0 (Apple Git-81)