From 346023a722ff864ba5c457d3d4c35b4b2d9fca80 Mon Sep 17 00:00:00 2001 From: Yi Deng Date: Tue, 7 Oct 2014 17:23:11 -0700 Subject: [PATCH] [HBASE-12198] Fix the bug of not updating location cache Summary: # Clear the cache of the server when failed Test Plan: Add testcase `TestHTableMultiplexerFlushCache` to reproduce the bug. Differential Revision: https://reviews.facebook.net/D24603 --- .../apache/hadoop/hbase/client/AsyncProcess.java | 7 +- .../hadoop/hbase/client/HTableMultiplexer.java | 47 +++++---- .../client/TestHTableMultiplexerFlushCache.java | 115 +++++++++++++++++++++ 3 files changed, 147 insertions(+), 22 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerFlushCache.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 2dbe263..368eab8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -1013,6 +1013,9 @@ class AsyncProcess { Retry canRetry = errorsByServer.canRetryMore(numAttempt) ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED; + if (tableName == null) { + hConnection.clearCaches(server); + } int failed = 0, stopped = 0; List> toReplay = new ArrayList>(); for (Map.Entry>> e : rsActions.actions.entrySet()) { @@ -1021,7 +1024,9 @@ class AsyncProcess { // Do not use the exception for updating cache because it might be coming from // any of the regions in the MultiAction. // TODO: depending on type of exception we might not want to update cache at all? - hConnection.updateCachedLocations(tableName, regionName, row, null, server); + if (tableName != null) { + hConnection.updateCachedLocations(tableName, regionName, row, null, server); + } for (Action action : e.getValue()) { Retry retry = manageError( action.getOriginalIndex(), action.getAction(), canRetry, t, server); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java index fae0a94..8d0fbc8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java @@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; @@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -79,7 +81,7 @@ public class HTableMultiplexer { private final Map serverToFlushWorkerMap = new ConcurrentHashMap<>(); - private final Configuration conf; + private final Configuration workerConf; private final ClusterConnection conn; private final ExecutorService pool; private final int retryNum; @@ -95,10 +97,9 @@ public class HTableMultiplexer { */ public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize) throws IOException { - this.conf = conf; this.conn = (ClusterConnection) ConnectionFactory.createConnection(conf); this.pool = HTable.getDefaultExecutor(conf); - this.retryNum = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + this.retryNum = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize; this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf); @@ -107,6 +108,11 @@ public class HTableMultiplexer { this.executor = Executors.newScheduledThreadPool(initThreads, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build()); + + this.workerConf = HBaseConfiguration.create(conf); + // We do not do the retry because we need to reassign puts to different queues if regions are + // moved. + this.workerConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); } /** @@ -218,7 +224,7 @@ public class HTableMultiplexer { worker = serverToFlushWorkerMap.get(addr); if (worker == null) { // Create the flush worker - worker = new FlushWorker(conf, this.conn, addr, this, perRegionServerBufferQueueSize, + worker = new FlushWorker(workerConf, this.conn, addr, this, perRegionServerBufferQueueSize, pool, executor); this.serverToFlushWorkerMap.put(addr, worker); executor.scheduleAtFixedRate(worker, flushPeriod, flushPeriod, TimeUnit.MILLISECONDS); @@ -388,32 +394,30 @@ public class HTableMultiplexer { private static class FlushWorker implements Runnable { private final HRegionLocation addr; - private final AsyncProcess asyncProc; private final LinkedBlockingQueue queue; private final HTableMultiplexer multiplexer; private final AtomicLong totalFailedPutCount = new AtomicLong(0); private final AtomicInteger currentProcessingCount = new AtomicInteger(0); private final AtomicAverageCounter averageLatency = new AtomicAverageCounter(); private final AtomicLong maxLatency = new AtomicLong(0); - private final ExecutorService pool; + + private final AsyncProcess ap; private final List processingList = new ArrayList<>(); private final ScheduledExecutorService executor; private final int maxRetryInQueue; private final AtomicInteger retryInQueue = new AtomicInteger(0); - private final int rpcTimeOutMs; public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr, HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize, ExecutorService pool, ScheduledExecutorService executor) { this.addr = addr; - this.asyncProc = conn.getAsyncProcess(); this.multiplexer = htableMultiplexer; this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize); - this.pool = pool; + RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf); + RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf); + this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory); this.executor = executor; this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000); - this.rpcTimeOutMs = - conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); } protected LinkedBlockingQueue getQueue() { @@ -456,10 +460,11 @@ public class HTableMultiplexer { // The currentPut is failed. So get the table name for the currentPut. final TableName tableName = ps.regionInfo.getTable(); - // Wait at least RPC timeout time - long delayMs = rpcTimeOutMs; - delayMs = Math.max(delayMs, (long) (multiplexer.flushPeriod * Math.pow(2, - multiplexer.retryNum - retryCount))); + long delayMs = ConnectionUtils.getPauseTime(multiplexer.flushPeriod, + multiplexer.retryNum - retryCount - 1); + if (LOG.isDebugEnabled()) { + LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount); + } executor.schedule(new Runnable() { @Override @@ -513,8 +518,8 @@ public class HTableMultiplexer { Collections.singletonMap(server, actions); try { AsyncRequestFuture arf = - asyncProc.submitMultiActions(null, retainedActions, 0L, null, results, true, null, - null, actionsByServer, pool); + ap.submitMultiActions(null, retainedActions, 0L, null, results, true, null, + null, actionsByServer, null); arf.waitUntilDone(); if (arf.hasError()) { // We just log and ignore the exception here since failed Puts will be resubmit again. @@ -523,20 +528,20 @@ public class HTableMultiplexer { } } finally { for (int i = 0; i < results.length; i++) { - if (results[i] == null) { + if (results[i] instanceof Result) { + failedCount--; + } else { if (failed == null) { failed = new ArrayList(); } failed.add(processingList.get(i)); - } else { - failedCount--; } } } if (failed != null) { // Resubmit failed puts - for (PutStatus putStatus : processingList) { + for (PutStatus putStatus : failed) { if (resubmitFailedPut(putStatus, this.addr)) { failedCount--; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerFlushCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerFlushCache.java new file mode 100644 index 0000000..2898369 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexerFlushCache.java @@ -0,0 +1,115 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ LargeTests.class, ClientTests.class }) +public class TestHTableMultiplexerFlushCache { + final Log LOG = LogFactory.getLog(getClass()); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static byte[] FAMILY = Bytes.toBytes("testFamily"); + private static byte[] QUALIFIER1 = Bytes.toBytes("testQualifier_1"); + private static byte[] QUALIFIER2 = Bytes.toBytes("testQualifier_2"); + private static byte[] VALUE1 = Bytes.toBytes("testValue1"); + private static byte[] VALUE2 = Bytes.toBytes("testValue2"); + private static int SLAVES = 3; + private static int PER_REGIONSERVER_QUEUE_SIZE = 100000; + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(SLAVES); + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + private static void checkExistence(HTable htable, byte[] row, byte[] family, byte[] quality, + byte[] value) throws Exception { + // verify that the Get returns the correct result + Result r; + Get get = new Get(row); + get.addColumn(family, quality); + int nbTry = 0; + do { + assertTrue("Fail to get from " + htable.getName() + " after " + nbTry + " tries", nbTry < 50); + nbTry++; + Thread.sleep(100); + r = htable.get(get); + } while (r == null || r.getValue(family, quality) == null); + assertEquals("value", Bytes.toStringBinary(value), + Bytes.toStringBinary(r.getValue(family, quality))); + } + + @Test + public void testOnRegionChange() throws Exception { + TableName TABLE = TableName.valueOf("testOnRegionChange"); + final int NUM_REGIONS = 10; + HTable htable = TEST_UTIL.createTable(TABLE, new byte[][] { FAMILY }, 3, + Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS); + + HTableMultiplexer multiplexer = new HTableMultiplexer(TEST_UTIL.getConfiguration(), + PER_REGIONSERVER_QUEUE_SIZE); + + byte[][] startRows = htable.getStartKeys(); + byte[] row = startRows[1]; + assertTrue("2nd region should not start with empty row", row != null && row.length > 0); + + Put put = new Put(row).add(FAMILY, QUALIFIER1, VALUE1); + assertTrue("multiplexer.put returns", multiplexer.put(TABLE, put)); + + checkExistence(htable, row, FAMILY, QUALIFIER1, VALUE1); + + // Now let's shutdown the regionserver and let regions moved to other servers. + HRegionLocation loc = htable.getRegionLocation(row); + MiniHBaseCluster hbaseCluster = TEST_UTIL.getHBaseCluster(); + hbaseCluster.stopRegionServer(loc.getServerName()); + TEST_UTIL.waitUntilAllRegionsAssigned(TABLE); + + // put with multiplexer. + put = new Put(row).add(FAMILY, QUALIFIER2, VALUE2); + assertTrue("multiplexer.put returns", multiplexer.put(TABLE, put)); + + checkExistence(htable, row, FAMILY, QUALIFIER2, VALUE2); + } +} -- 2.0.1