From f803c1ed9a5690291d95ec0f7440923a84011a0f Mon Sep 17 00:00:00 2001 From: zhangduo Date: Sun, 31 Mar 2019 14:27:51 +0800 Subject: [PATCH] HBASE-22135 AsyncAdmin will not refresh master address --- .../hbase/client/AsyncConnectionImpl.java | 55 +++++++------------ .../AsyncMasterRequestRpcRetryingCaller.java | 14 ++++- .../hbase/client/AsyncMetaRegionLocator.java | 46 +--------------- .../hadoop/hbase/client/ConnectionUtils.java | 47 ++++++++++++++++ .../client/TestAsyncAdminMasterSwitch.java | 54 ++++++++++++++++++ 5 files changed, 135 insertions(+), 81 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminMasterSwitch.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index e47d4ccf1e..015cb6c8c7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -235,45 +235,30 @@ class AsyncConnectionImpl implements AsyncConnection { } CompletableFuture getMasterStub() { - MasterService.Interface masterStub = this.masterStub.get(); - - if (masterStub == null) { - for (;;) { - if (this.masterStubMakeFuture.compareAndSet(null, new CompletableFuture<>())) { - CompletableFuture future = this.masterStubMakeFuture.get(); - makeMasterStub(future); + return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> { + CompletableFuture future = new CompletableFuture<>(); + addListener(registry.getMasterAddress(), (addr, error) -> { + if (error != null) { + future.completeExceptionally(error); + } else if (addr == null) { + future.completeExceptionally(new MasterNotRunningException( + "ZooKeeper available but no active master location found")); } else { - CompletableFuture future = this.masterStubMakeFuture.get(); - if (future != null) { - return future; + LOG.debug("The fetched master address is {}", addr); + try { + future.complete(createMasterStub(addr)); + } catch (IOException e) { + future.completeExceptionally(e); } } - } - } - for (;;) { - if (masterStubMakeFuture.compareAndSet(null, new CompletableFuture<>())) { - CompletableFuture future = masterStubMakeFuture.get(); - HBaseRpcController controller = getRpcController(); - masterStub.isMasterRunning(controller, RequestConverter.buildIsMasterRunningRequest(), - new RpcCallback() { - @Override - public void run(IsMasterRunningResponse resp) { - if (controller.failed() || resp == null || - (resp != null && !resp.getIsMasterRunning())) { - makeMasterStub(future); - } else { - future.complete(masterStub); - } - } - }); - } else { - CompletableFuture future = masterStubMakeFuture.get(); - if (future != null) { - return future; - } - } - } + }); + return future; + }, stub -> true, "master stub"); + } + + void clearMasterStubCache(MasterService.Interface stub) { + masterStub.compareAndSet(stub, null); } private HBaseRpcController getRpcController() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java index e5594cbac7..5ba4dee525 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java @@ -20,7 +20,9 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.io.netty.util.Timer; @@ -49,6 +51,14 @@ public class AsyncMasterRequestRpcRetryingCaller extends AsyncRpcRetryingCall this.callable = callable; } + private void clearMasterStubCacheOnError(MasterService.Interface stub, Throwable error) { + // ServerNotRunningYetException may because it is the backup master. + if (ClientExceptionsUtil.isConnectionException(error) || + error instanceof ServerNotRunningYetException) { + conn.clearMasterStubCache(stub); + } + } + @Override protected void doCall() { addListener(conn.getMasterStub(), (stub, error) -> { @@ -60,8 +70,8 @@ public class AsyncMasterRequestRpcRetryingCaller extends AsyncRpcRetryingCall resetCallTimeout(); addListener(callable.call(controller, stub), (result, error2) -> { if (error2 != null) { - onError(error2, () -> "Call to master failed", err -> { - }); + onError(error2, () -> "Call to master failed", + err -> clearMasterStubCacheOnError(stub, error2)); return; } future.complete(result); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java index 175f8f2219..fa08795b17 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java @@ -22,15 +22,12 @@ import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegi import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood; import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation; import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.replaceRegionLocation; -import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * The asynchronous locator for meta region. @@ -38,8 +35,6 @@ import org.slf4j.LoggerFactory; @InterfaceAudience.Private class AsyncMetaRegionLocator { - private static final Logger LOG = LoggerFactory.getLogger(AsyncMetaRegionLocator.class); - private final AsyncRegistry registry; private final AtomicReference metaRegionLocations = new AtomicReference<>(); @@ -61,45 +56,8 @@ class AsyncMetaRegionLocator { * cached region locations and cause an infinite loop. */ CompletableFuture getRegionLocations(int replicaId, boolean reload) { - for (;;) { - if (!reload) { - RegionLocations locs = this.metaRegionLocations.get(); - if (isGood(locs, replicaId)) { - return CompletableFuture.completedFuture(locs); - } - } - LOG.trace("Meta region location cache is null, try fetching from registry."); - if (metaRelocateFuture.compareAndSet(null, new CompletableFuture<>())) { - LOG.debug("Start fetching meta region location from registry."); - CompletableFuture future = metaRelocateFuture.get(); - addListener(registry.getMetaRegionLocation(), (locs, error) -> { - if (error != null) { - LOG.debug("Failed to fetch meta region location from registry", error); - metaRelocateFuture.getAndSet(null).completeExceptionally(error); - return; - } - LOG.debug("The fetched meta region location is {}", locs); - // Here we update cache before reset future, so it is possible that someone can get a - // stale value. Consider this: - // 1. update cache - // 2. someone clear the cache and relocate again - // 3. the metaRelocateFuture is not null so the old future is used. - // 4. we clear metaRelocateFuture and complete the future in it with the value being - // cleared in step 2. - // But we do not think it is a big deal as it rarely happens, and even if it happens, the - // caller will retry again later, no correctness problems. - this.metaRegionLocations.set(locs); - metaRelocateFuture.set(null); - future.complete(locs); - }); - return future; - } else { - CompletableFuture future = metaRelocateFuture.get(); - if (future != null) { - return future; - } - } - } + return ConnectionUtils.getOrFetch(metaRegionLocations, metaRelocateFuture, reload, + registry::getMetaRegionLocation, locs -> isGood(locs, replicaId), "meta region location"); } private HRegionLocation getCacheLocation(HRegionLocation loc) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index fea7a1e69c..101dda055d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -32,7 +32,10 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.function.Predicate; +import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; @@ -617,4 +620,48 @@ public final class ConnectionUtils { return HConstants.NORMAL_QOS; } } + + static CompletableFuture getOrFetch(AtomicReference cacheRef, + AtomicReference> futureRef, boolean reload, + Supplier> fetch, Predicate validator, String type) { + for (;;) { + if (!reload) { + T value = cacheRef.get(); + if (value != null && validator.test(value)) { + return CompletableFuture.completedFuture(value); + } + } + LOG.trace("{} cache is null, try fetching from registry", type); + if (futureRef.compareAndSet(null, new CompletableFuture<>())) { + LOG.debug("Start fetching{} from registry", type); + CompletableFuture future = futureRef.get(); + addListener(fetch.get(), (value, error) -> { + if (error != null) { + LOG.debug("Failed to fetch {} from registry", type, error); + futureRef.getAndSet(null).completeExceptionally(error); + return; + } + LOG.debug("The fetched {} is {}", type, value); + // Here we update cache before reset future, so it is possible that someone can get a + // stale value. Consider this: + // 1. update cacheRef + // 2. someone clears the cache and relocates again + // 3. the futureRef is not null so the old future is used. + // 4. we clear futureRef and complete the future in it with the value being + // cleared in step 2. + // But we do not think it is a big deal as it rarely happens, and even if it happens, the + // caller will retry again later, no correctness problems. + cacheRef.set(value); + futureRef.set(null); + future.complete(value); + }); + return future; + } else { + CompletableFuture future = futureRef.get(); + if (future != null) { + return future; + } + } + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminMasterSwitch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminMasterSwitch.java new file mode 100644 index 0000000000..205de1db86 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminMasterSwitch.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.EnumSet; +import org.apache.hadoop.hbase.ClusterMetrics; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * Testcase for HBASE-22135. + */ +@RunWith(Parameterized.class) +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncAdminMasterSwitch extends TestAsyncAdminBase { + + @Test + public void testSwitch() throws IOException, InterruptedException { + assertEquals(TEST_UTIL.getHBaseCluster().getRegionServerThreads().size(), + admin.getClusterMetrics(EnumSet.of(ClusterMetrics.Option.SERVERS_NAME)).join() + .getServersName().size()); + // stop the old master, and start a new one + TEST_UTIL.getMiniHBaseCluster().startMaster(); + TEST_UTIL.getMiniHBaseCluster().stopMaster(0).join(); + assertTrue(TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster(30000)); + // make sure that we could still call master + assertEquals(TEST_UTIL.getHBaseCluster().getRegionServerThreads().size(), + admin.getClusterMetrics(EnumSet.of(ClusterMetrics.Option.SERVERS_NAME)).join() + .getServersName().size()); + } +} -- 2.17.1