From 798d9b9bdf316cfb46b9d17d649933fa7f0ba8d5 Mon Sep 17 00:00:00 2001 From: David Deng Date: Tue, 23 Sep 2014 22:46:03 -0700 Subject: [PATCH] HBASE-12086 Fix bug of HTableMultipliexer Signed-off-by: Elliott Clark Conflicts: hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java --- .../org/apache/hadoop/hbase/client/HTable.java | 17 +- .../hadoop/hbase/client/HTableMultiplexer.java | 226 +++++++++++---------- .../hadoop/hbase/client/TestHTableMultiplexer.java | 65 +++--- 3 files changed, 170 insertions(+), 138 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 3a4a715..b7b323f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -329,6 +329,13 @@ public class HTable implements HTableInterface { } /** + * @return maxKeyValueSize from configuration. + */ + public static int getMaxKeyValueSize(Configuration conf) { + return conf.getInt("hbase.client.keyvalue.maxsize", -1); + } + + /** * setup this HTable's parameter based on the passed configuration */ private void finishSetup() throws IOException { @@ -351,8 +358,7 @@ public class HTable implements HTableInterface { ap = new AsyncProcess(connection, tableName, pool, null, configuration, rpcCallerFactory, rpcControllerFactory); - this.maxKeyValueSize = this.configuration.getInt( - "hbase.client.keyvalue.maxsize", -1); + this.maxKeyValueSize = getMaxKeyValueSize(this.configuration); this.closed = false; } @@ -1299,7 +1305,12 @@ public class HTable implements HTableInterface { } // validate for well-formedness - public void validatePut(final Put put) throws IllegalArgumentException{ + public void validatePut(final Put put) throws IllegalArgumentException { + validatePut(put, maxKeyValueSize); + } + + // validate for well-formedness + public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException { if (put.isEmpty()) { throw new IllegalArgumentException("No columns to insert"); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java index 480a139..cf715dd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java @@ -22,13 +22,12 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; import java.util.AbstractMap.SimpleEntry; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -42,7 +41,8 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.client.AsyncProcess.AsyncProcessCallback; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** @@ -67,35 +67,35 @@ public class HTableMultiplexer { static final String TABLE_MULTIPLEXER_FLUSH_FREQ_MS = "hbase.tablemultiplexer.flush.frequency.ms"; - private Map tableNameToHTableMap; - /** The map between each region server to its corresponding buffer queue */ - private Map> - serverToBufferQueueMap; + private final Map> serverToBufferQueueMap = + new ConcurrentHashMap>(); /** The map between each region server to its flush worker */ - private Map serverToFlushWorkerMap; + private final Map serverToFlushWorkerMap = + new ConcurrentHashMap(); - private Configuration conf; - private int retryNum; + private final Configuration conf; + private final HConnection conn; + private final ExecutorService pool; + private final int retryNum; private int perRegionServerBufferQueueSize; + private final int maxKeyValueSize; /** - * * @param conf The HBaseConfiguration - * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops - * for each region server before dropping the request. + * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for + * each region server before dropping the request. */ - public HTableMultiplexer(Configuration conf, - int perRegionServerBufferQueueSize) throws ZooKeeperConnectionException { + public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize) + throws IOException { this.conf = conf; - this.serverToBufferQueueMap = new ConcurrentHashMap>(); - this.serverToFlushWorkerMap = new ConcurrentHashMap(); - this.tableNameToHTableMap = new ConcurrentSkipListMap(); + this.conn = HConnectionManager.createConnection(conf); + this.pool = HTable.getDefaultExecutor(conf); this.retryNum = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize; + this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf); } /** @@ -110,10 +110,6 @@ public class HTableMultiplexer { return put(tableName, put, this.retryNum); } - public boolean put(byte[] tableName, final Put put) throws IOException { - return put(TableName.valueOf(tableName), put); - } - /** * The puts request will be buffered by their corresponding buffer queue. * Return the list of puts which could not be queued. @@ -165,15 +161,14 @@ public class HTableMultiplexer { return false; } - LinkedBlockingQueue queue; - HTable htable = getHTable(tableName); try { - htable.validatePut(put); - HRegionLocation loc = htable.getRegionLocation(put.getRow(), false); + HTable.validatePut(put, maxKeyValueSize); + HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false); if (loc != null) { // Add the put pair into its corresponding queue. - queue = addNewRegionServer(loc, htable); - // Generate a MultiPutStatus obj and offer it into the queue + + LinkedBlockingQueue queue = getQueue(loc); + // Generate a MultiPutStatus object and offer it into the queue PutStatus s = new PutStatus(loc.getRegionInfo(), put, retry); return queue.offer(s); @@ -196,43 +191,30 @@ public class HTableMultiplexer { return new HTableMultiplexerStatus(serverToFlushWorkerMap); } - - private HTable getHTable(TableName tableName) throws IOException { - HTable htable = this.tableNameToHTableMap.get(tableName); - if (htable == null) { - synchronized (this.tableNameToHTableMap) { - htable = this.tableNameToHTableMap.get(tableName); - if (htable == null) { - htable = new HTable(conf, tableName); - this.tableNameToHTableMap.put(tableName, htable); + private LinkedBlockingQueue getQueue(HRegionLocation addr) { + LinkedBlockingQueue queue = serverToBufferQueueMap.get(addr); + if (queue == null) { + synchronized (this.serverToBufferQueueMap) { + queue = serverToBufferQueueMap.get(addr); + if (queue == null) { + // Create a queue for the new region server + queue = new LinkedBlockingQueue(perRegionServerBufferQueueSize); + serverToBufferQueueMap.put(addr, queue); + + // Create the flush worker + HTableFlushWorker worker = + new HTableFlushWorker(conf, this.conn, addr, this, queue, pool); + this.serverToFlushWorkerMap.put(addr, worker); + + // Launch a daemon thread to flush the puts + // from the queue to its corresponding region server. + String name = "HTableFlushWorker-" + addr.getHostnamePort() + "-" + (poolID++); + Thread t = new Thread(worker, name); + t.setDaemon(true); + t.start(); } } } - return htable; - } - - private synchronized LinkedBlockingQueue addNewRegionServer( - HRegionLocation addr, HTable htable) { - LinkedBlockingQueue queue = - serverToBufferQueueMap.get(addr); - if (queue == null) { - // Create a queue for the new region server - queue = new LinkedBlockingQueue(perRegionServerBufferQueueSize); - serverToBufferQueueMap.put(addr, queue); - - // Create the flush worker - HTableFlushWorker worker = new HTableFlushWorker(conf, addr, - this, queue, htable); - this.serverToFlushWorkerMap.put(addr, worker); - - // Launch a daemon thread to flush the puts - // from the queue to its corresponding region server. - String name = "HTableFlushWorker-" + addr.getHostnamePort() + "-" - + (poolID++); - Thread t = new Thread(worker, name); - t.setDaemon(true); - t.start(); - } return queue; } @@ -404,29 +386,28 @@ public class HTableMultiplexer { } } - private static class HTableFlushWorker implements Runnable { - private HRegionLocation addr; - private Configuration conf; - private LinkedBlockingQueue queue; - private HTableMultiplexer htableMultiplexer; - private AtomicLong totalFailedPutCount; - private AtomicInteger currentProcessingPutCount; - private AtomicAverageCounter averageLatency; - private AtomicLong maxLatency; - private HTable htable; // For Multi + private static class HTableFlushWorker implements Runnable, AsyncProcessCallback { + private final HRegionLocation addr; + private final Configuration conf; + private final LinkedBlockingQueue queue; + private final HTableMultiplexer htableMultiplexer; + private final AtomicLong totalFailedPutCount = new AtomicLong(0); + private final AtomicInteger currentProcessingPutCount = new AtomicInteger(0); + private final AtomicAverageCounter averageLatency = new AtomicAverageCounter(); + private final AtomicLong maxLatency = new AtomicLong(0); + private final AsyncProcess ap; + private final List results = new ArrayList(); - public HTableFlushWorker(Configuration conf, HRegionLocation addr, - HTableMultiplexer htableMultiplexer, - LinkedBlockingQueue queue, HTable htable) { + public HTableFlushWorker(Configuration conf, HConnection conn, HRegionLocation addr, + HTableMultiplexer htableMultiplexer, LinkedBlockingQueue queue, ExecutorService pool) { this.addr = addr; this.conf = conf; this.htableMultiplexer = htableMultiplexer; this.queue = queue; - this.totalFailedPutCount = new AtomicLong(0); - this.currentProcessingPutCount = new AtomicInteger(0); - this.averageLatency = new AtomicAverageCounter(); - this.maxLatency = new AtomicLong(0); - this.htable = htable; + RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf); + RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf); + this.ap = new AsyncProcess(conn, null, pool, this, conf, rpcCallerFactory, + rpcControllerFactory); } public long getTotalFailedCount() { @@ -494,16 +475,30 @@ public class HTableMultiplexer { currentProcessingPutCount.set(processingList.size()); if (processingList.size() > 0) { - ArrayList list = new ArrayList(processingList.size()); - for (PutStatus putStatus: processingList) { - list.add(putStatus.getPut()); + this.results.clear(); + List> retainedActions = new ArrayList>(processingList.size()); + MultiAction actions = new MultiAction(); + for (int i = 0; i < processingList.size(); i++) { + PutStatus putStatus = processingList.get(i); + Action action = new Action(putStatus.getPut(), i); + actions.add(putStatus.getRegionInfo().getRegionName(), action); + retainedActions.add(action); + this.results.add(null); } - // Process this multiput request - List failed = null; - Object[] results = new Object[list.size()]; + // Process this multi-put request + List failed = null; + Map> actionsByServer = + Collections.singletonMap(addr, actions); try { - htable.batch(list, results); + HConnectionManager.ServerErrorTracker errorsByServer = + new HConnectionManager.ServerErrorTracker(1, 10); + ap.sendMultiAction(retainedActions, actionsByServer, 10, errorsByServer); + ap.waitUntilDone(); + + if (ap.hasError()) { + throw ap.getErrors(); + } } catch (IOException e) { LOG.debug("Caught some exceptions " + e + " when flushing puts to region server " + addr.getHostnamePort()); @@ -513,35 +508,26 @@ public class HTableMultiplexer { // results are returned in the same order as the requests in list // walk the list backwards, so we can remove from list without // impacting the indexes of earlier members - for (int i = results.length - 1; i >= 0; i--) { - if (results[i] instanceof Result) { - // successful Puts are removed from the list here. - list.remove(i); + for (int i = 0; i < results.size(); i++) { + if (results.get(i) == null) { + if (failed == null) { + failed = new ArrayList(); + } + failed.add(processingList.get(i)); } } - failed = list; } if (failed != null) { - if (failed.size() == processingList.size()) { - // All the puts for this region server are failed. Going to retry it later - for (PutStatus putStatus: processingList) { - if (!resubmitFailedPut(putStatus, this.addr)) { - failedCount++; - } - } - } else { - Set failedPutSet = new HashSet(failed); - for (PutStatus putStatus: processingList) { - if (failedPutSet.contains(putStatus.getPut()) - && !resubmitFailedPut(putStatus, this.addr)) { - failedCount++; - } + // Resubmit failed puts + for (PutStatus putStatus : processingList) { + if (!resubmitFailedPut(putStatus, this.addr)) { + failedCount++; } } + // Update the totalFailedCount + this.totalFailedPutCount.addAndGet(failedCount); } - // Update the totalFailedCount - this.totalFailedPutCount.addAndGet(failedCount); elapsed = EnvironmentEdgeManager.currentTimeMillis() - start; // Update latency counters @@ -573,9 +559,27 @@ public class HTableMultiplexer { // Log all the exceptions and move on LOG.debug("Caught some exceptions " + e + " when flushing puts to region server " - + addr.getHostnamePort()); + + addr.getHostnamePort(), e); } } } + + @Override + public void success(int originalIndex, byte[] region, Row row, Object result) { + if (results == null || originalIndex >= results.size()) { + return; + } + results.set(originalIndex, result); + } + + @Override + public boolean failure(int originalIndex, byte[] region, Row row, Throwable t) { + return false; + } + + @Override + public boolean retriableFailure(int originalIndex, Row row, byte[] region, Throwable exception) { + return false; + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java index 2f0bf37..cd85c38 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java @@ -27,9 +27,9 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.util.Bytes; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -63,10 +63,27 @@ public class TestHTableMultiplexer { TEST_UTIL.shutdownMiniCluster(); } + private static void checkExistence(HTable htable, byte[] row, byte[] family, byte[] quality) + throws Exception { + // verify that the Get returns the correct result + Result r; + Get get = new Get(row); + get.addColumn(FAMILY, QUALIFIER); + int nbTry = 0; + do { + assertTrue("Fail to get from " + htable.getName() + " after " + nbTry + " tries", nbTry < 50); + nbTry++; + Thread.sleep(100); + r = htable.get(get); + } while (r == null || r.getValue(FAMILY, QUALIFIER) == null); + assertEquals("value", Bytes.toStringBinary(VALUE1), + Bytes.toStringBinary(r.getValue(FAMILY, QUALIFIER))); + } + @Test public void testHTableMultiplexer() throws Exception { - TableName TABLE = - TableName.valueOf("testHTableMultiplexer"); + TableName TABLE_1 = TableName.valueOf("testHTableMultiplexer_1"); + TableName TABLE_2 = TableName.valueOf("testHTableMultiplexer_2"); final int NUM_REGIONS = 10; final int VERSION = 3; List failedPuts; @@ -75,35 +92,35 @@ public class TestHTableMultiplexer { HTableMultiplexer multiplexer = new HTableMultiplexer(TEST_UTIL.getConfiguration(), PER_REGIONSERVER_QUEUE_SIZE); - HTable ht = TEST_UTIL.createTable(TABLE, new byte[][] { FAMILY }, VERSION, + HTable htable1 = + TEST_UTIL.createTable(TABLE_1, new byte[][] { FAMILY }, VERSION, Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS); - TEST_UTIL.waitUntilAllRegionsAssigned(TABLE); + HTable htable2 = + TEST_UTIL.createTable(TABLE_2, new byte[][] { FAMILY }, VERSION, Bytes.toBytes("aaaaa"), + Bytes.toBytes("zzzzz"), NUM_REGIONS); + TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_1); + TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_2); - byte[][] startRows = ht.getStartKeys(); - byte[][] endRows = ht.getEndKeys(); + byte[][] startRows = htable1.getStartKeys(); + byte[][] endRows = htable1.getEndKeys(); // SinglePut case for (int i = 0; i < NUM_REGIONS; i++) { byte [] row = startRows[i]; if (row == null || row.length <= 0) continue; - Put put = new Put(row); - put.add(FAMILY, QUALIFIER, VALUE1); - success = multiplexer.put(TABLE, put); - assertTrue(success); + Put put = new Put(row).add(FAMILY, QUALIFIER, VALUE1); + success = multiplexer.put(TABLE_1, put); + assertTrue("multiplexer.put returns", success); + + put = new Put(row).add(FAMILY, QUALIFIER, VALUE1); + success = multiplexer.put(TABLE_2, put); + assertTrue("multiplexer.put failed", success); - LOG.info("Put for " + Bytes.toString(startRows[i]) + " @ iteration " + (i+1)); + LOG.info("Put for " + Bytes.toStringBinary(startRows[i]) + " @ iteration " + (i + 1)); // verify that the Get returns the correct result - Get get = new Get(startRows[i]); - get.addColumn(FAMILY, QUALIFIER); - Result r; - int nbTry = 0; - do { - assertTrue(nbTry++ < 50); - Thread.sleep(100); - r = ht.get(get); - } while (r == null || r.getValue(FAMILY, QUALIFIER) == null); - assertEquals(0, Bytes.compareTo(VALUE1, r.getValue(FAMILY, QUALIFIER))); + checkExistence(htable1, startRows[i], FAMILY, QUALIFIER); + checkExistence(htable2, startRows[i], FAMILY, QUALIFIER); } // MultiPut case @@ -115,7 +132,7 @@ public class TestHTableMultiplexer { put.add(FAMILY, QUALIFIER, VALUE2); multiput.add(put); } - failedPuts = multiplexer.put(TABLE, multiput); + failedPuts = multiplexer.put(TABLE_1, multiput); assertTrue(failedPuts == null); // verify that the Get returns the correct result @@ -129,7 +146,7 @@ public class TestHTableMultiplexer { do { assertTrue(nbTry++ < 50); Thread.sleep(100); - r = ht.get(get); + r = htable1.get(get); } while (r == null || r.getValue(FAMILY, QUALIFIER) == null || Bytes.compareTo(VALUE2, r.getValue(FAMILY, QUALIFIER)) != 0); } -- 1.9.4