From 6966c1f484c65dd29283cbc095ee0d9f2ddc5c26 Mon Sep 17 00:00:00 2001 From: yaojingyi Date: Fri, 5 Jul 2019 23:09:55 +0800 Subject: [PATCH] HBASE-22620 --- .../regionserver/ReplicationSource.java | 49 ++++++- .../ReplicationSourceWALReaderThread.java | 5 + .../hbase/replication/TestReplicationSource.java | 143 +++++++++++++++++++-- 3 files changed, 180 insertions(+), 17 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index d3f2620749..d86d994cc5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -18,10 +18,6 @@ */ package org.apache.hadoop.hbase.replication.regionserver; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.Service; - import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -35,6 +31,10 @@ import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.Service; + import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -513,6 +513,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf // Current state of the worker thread private WorkerState state; ReplicationSourceWALReaderThread entryReader; + private final int DEFAULT_TIMEOUT = 20000; + private final int getEntriesTimeout; public ReplicationSourceShipperThread(String walGroupId, PriorityBlockingQueue queue, ReplicationQueueInfo replicationQueueInfo, @@ -521,6 +523,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf this.queue = queue; this.replicationQueueInfo = replicationQueueInfo; this.source = source; + this.getEntriesTimeout = conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT); // 20 seconds } @Override @@ -547,7 +550,43 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf } try { - WALEntryBatch entryBatch = entryReader.take(); +// WALEntryBatch entryBatch = entryReader.take(); + + WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout); + + String lastCheckPath = null; + // do while until entryBatch is not null + while (entryBatch == null) { + + if (lastCheckPath == null) { + lastCheckPath = entryReader.getCurrentPath().getName(); + } + if (lastCheckPath != null && !lastCheckPath + .equals(entryReader.getCurrentPath().getName())) { + String toRemoveLog = lastCheckPath; + //clean znode + replicationQueues.removeLog(peerClusterZnode, toRemoveLog); + lastCheckPath = entryReader.getCurrentPath().getName(); + if (manager.getWalsByIdRecoveredQueues().get(peerClusterZnode) + != null) { + manager.getWalsByIdRecoveredQueues().get(peerClusterZnode) + .remove(toRemoveLog); + } + //-------- + + if (manager.getWALs().get(peerClusterZnode) != null) { + + String serverName = + DefaultWALProvider.getWALPrefixFromWALName(toRemoveLog); + manager.getWALs().get(peerClusterZnode).get(serverName) + .remove(toRemoveLog); + } + + } + entryBatch = entryReader.poll(getEntriesTimeout); + } + + shipEdits(entryBatch); if (replicationQueueInfo.isQueueRecovered() && entryBatch.getWalEntries().isEmpty() && entryBatch.getLastSeqIds().isEmpty()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java index 872f91d08a..ee5fbefdda 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -227,6 +228,10 @@ public class ReplicationSourceWALReaderThread extends Thread { return entryBatchQueue.take(); } + public WALEntryBatch poll(long period) throws InterruptedException { + return entryBatchQueue.poll(period, TimeUnit.MILLISECONDS); + } + private long getEntrySize(Entry entry) { WALEdit edit = entry.getEdit(); return edit.heapSize() + calculateTotalSizeOfStoreFiles(edit); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java index 990c5fd81f..5258b07446 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java @@ -21,8 +21,10 @@ package org.apache.hadoop.hbase.replication; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; import java.io.IOException; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -32,36 +34,46 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoordinatedStateManager; -import org.apache.hadoop.hbase.MiniHBaseCluster; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.Waiter; -import org.apache.hadoop.hbase.Waiter.Predicate; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.Waiter.Predicate; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; import org.apache.hadoop.hbase.replication.regionserver.Replication; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.WALProvider; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -import static org.mockito.Mockito.mock; - @Category(MediumTests.class) public class TestReplicationSource { @@ -263,6 +275,113 @@ public class TestReplicationSource { } } + @Test + public void testLogRollToRemoveZnode() throws Exception { + + long logrollPeriod = 15000; + conf.setLong("hbase.regionserver.logroll.period", logrollPeriod);//15s + + // Introduces a delay in regionserver shutdown to give the race condition a chance to kick in. + conf.set(HConstants.REGION_SERVER_IMPL, + ShutdownDelayRegionServer.class.getName()); + MiniHBaseCluster cluster = TEST_UTIL.startMiniCluster(1); + TEST_UTIL_PEER.startMiniCluster(1); + + HRegionServer serverA = cluster.getRegionServer(0); + final ReplicationSourceManager managerA = + ((Replication) serverA.getReplicationSourceService()) + .getReplicationManager(); + + TableName tablename = TableName.valueOf("TEST_TAB"); + + HTableDescriptor tableDesc1 = new HTableDescriptor(tablename); + HTableDescriptor tableDesc2 = new HTableDescriptor(tablename); + HColumnDescriptor fam = new HColumnDescriptor("C"); + fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); + tableDesc1.addFamily(fam); + + HColumnDescriptor fam2 = new HColumnDescriptor("C"); + tableDesc2.addFamily(fam2); + + Connection connection1 = + ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); + Connection connection2 = + ConnectionFactory.createConnection(TEST_UTIL_PEER.getConfiguration()); + + Admin admin = connection1.getAdmin(); + Admin admin2 = connection2.getAdmin(); + admin.createTable(tableDesc1); + admin2.createTable(tableDesc2); + + final ReplicationAdmin replicationAdmin = + new ReplicationAdmin(TEST_UTIL.getConfiguration()); + final String peerId = "testPeer"; + replicationAdmin.addPeer(peerId, new ReplicationPeerConfig() + .setClusterKey(TEST_UTIL_PEER.getClusterKey()), null); + // Wait for replication sources to come up + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() throws Exception { + return !(managerA.getSources().isEmpty()); + } + }); + + Table table = connection1.getTable(tablename); + String testValue = "testValue"; + String testValue2 = "testValue2"; + String testKey = "TestKey"; + Put put = new Put(testKey.getBytes()); + put.addColumn("C".getBytes(), "c".getBytes(), testValue.getBytes()); + + table.put(put); + + //get znode count + ZooKeeperWatcher zkw1 = + new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), "cluster1", null, + true); + + String baseZnode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, + HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); + String replicationZnode = TEST_UTIL.getConfiguration() + .get("zookeeper.znode.replication", "replication"); + String rs = TEST_UTIL.getConfiguration() + .get("zookeeper.znode.replication.rs", "rs"); + String serverName = serverA.getServerName().toString(); + String znode = + baseZnode + "/" + replicationZnode + "/" + rs + "/" + serverName + "/" + + peerId; + List childList = + ZKUtil.getChildDataAndWatchForNewChildren(zkw1, znode); + assertEquals(childList.size(), 1); + //sleep 15s wait roll Wal + Thread.sleep(logrollPeriod); + + Put put2 = new Put(testKey.getBytes()); + put2.addColumn("C".getBytes(), "c".getBytes(), testValue2.getBytes()); + + table.put(put2); + //wait to ship to target table + Thread.sleep(1000); + + //get znode count & get table value + + Table table2 = connection2.getTable(tablename); + + Get get = new Get(testKey.getBytes()); + Result result = table2.get(get); + String val = null; + for (Cell cell : result.rawCells()) { + val = new String(CellUtil.cloneValue(cell)); + } + assertEquals(val, testValue2); + + //sleep 15s wait roll Wal + Thread.sleep(logrollPeriod); + + //get znode count + assertEquals(childList.size(), 1); + + } + /** * Regionserver implementation that adds a delay on the graceful shutdown. */ -- 2.15.1 (Apple Git-101)