diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 9a449ad..8f513d0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -226,7 +226,7 @@ return this.replicationManager; } - void addHFileRefsToQueue(TableName tableName, byte[] family, List> pairs) + public void addHFileRefsToQueue(TableName tableName, byte[] family, List> pairs) throws IOException { try { this.replicationManager.addHFileRefs(tableName, family, pairs); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java index f99ed75..a6790f6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java @@ -72,7 +72,7 @@ // just going to break. This is all private. Not allowed. Regions shouldn't assume they are // hosted in a RegionServer. TODO: fix. RegionServerServices rss = ((HasRegionServerServices)env).getRegionServerServices(); - Replication rep = (Replication)((HRegionServer)rss).getReplicationSourceService(); + Replication rep = (Replication)rss.getReplicationSourceService(); rep.addHFileRefsToQueue(env.getRegionInfo().getTable(), family, pairs); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index a2664ce..4d5fed1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -39,6 +39,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InterruptedIOException; import java.math.BigDecimal; @@ -61,6 +63,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; + import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -128,6 +131,7 @@ import org.apache.hadoop.hbase.filter.SubstringComparator; import org.apache.hadoop.hbase.filter.ValueFilter; import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; @@ -139,8 +143,16 @@ import org.apache.hadoop.hbase.regionserver.wal.FSHLog; import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; +import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.test.MetricsAssertHelper; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; @@ -150,6 +162,7 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.FaultyFSLog; @@ -161,6 +174,11 @@ import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.hadoop.hbase.wal.WALSplitter; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; +import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; +import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -169,6 +187,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; import org.junit.rules.TestName; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; @@ -177,20 +196,6 @@ import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; -import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; -import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; -import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; -import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; - -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; /** * Basic stand-alone testing of HRegion. No clusters! @@ -220,6 +225,8 @@ HRegion region = null; // Do not run unit tests in parallel (? Why not? It don't work? Why not? St.Ack) protected static HBaseTestingUtility TEST_UTIL; + @ClassRule + public static TemporaryFolder testFolder = new TemporaryFolder(); public static Configuration CONF ; private String dir; private final int MAX_VERSIONS = 2; @@ -6281,6 +6288,44 @@ getCoprocessors().contains(ReplicationObserver.class.getSimpleName())); } + @Test + public void testSkipAddHFileRefNode() throws IOException { // test for HBASE-22335 + TEST_UTIL.getConfiguration().setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); + final ServerName serverName = ServerName.valueOf(name.getMethodName(), 100, 42); + final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName)); + Replication replication = Mockito.mock(Replication.class); + Mockito.doThrow( + new IOException("REPLICATION_SCOPE is 0, should skip recording bulk load entries.")) + .when(replication).addHFileRefsToQueue(Mockito.any(), Mockito.any(), Mockito.any()); + when(rss.getReplicationSourceService()).thenReturn(replication); + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName())); + htd.addFamily(new HColumnDescriptor(fam1)); + HRegionInfo hri = new HRegionInfo(htd.getTableName(), + HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY); + region = HRegion.openHRegion(hri, htd, rss.getWAL(hri), TEST_UTIL.getConfiguration(), + rss, null); + + List> familyPaths = new ArrayList<>(); + HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache( + TEST_UTIL.getConfiguration()); + File hFileLocation = testFolder.newFile(); + try (FSDataOutputStream out = new FSDataOutputStream( + new FileOutputStream(hFileLocation), null)) { + hFileFactory.withOutputStream(out); + hFileFactory.withFileContext(new HFileContext()); + HFile.Writer writer = hFileFactory.create(); + try { + byte[] randomBytes = new byte[100]; + writer.append(new KeyValue(CellUtil.createCell(randomBytes, fam1, randomBytes, + 0L, KeyValue.Type.Put.getCode(), randomBytes))); + } finally { + writer.close(); + } + } + familyPaths.add(new Pair<>(fam1, hFileLocation.getAbsoluteFile().getAbsolutePath())); + region.bulkLoadHFiles(familyPaths, false, null); + } + /** * The same as HRegion class, the only difference is that instantiateHStore will * create a different HStore - HStoreForTesting. [HBASE-8518]