From 2fd1f01b4a27bba03d803e29f30267bb1d5cf16d Mon Sep 17 00:00:00 2001 From: zhangduo Date: Fri, 2 Feb 2018 10:26:39 +0800 Subject: [PATCH] HBASE-19904 Break dependency of WAL constructor on Replication --- .../hbase/mapreduce/TestWALRecordReader.java | 4 +- .../hadoop/hbase/replication/ReplicationUtils.java | 10 ++ .../org/apache/hadoop/hbase/master/HMaster.java | 27 +++- .../apache/hadoop/hbase/regionserver/HRegion.java | 49 +++--- .../hadoop/hbase/regionserver/HRegionServer.java | 108 ++++++------- .../hbase/regionserver/ReplicationService.java | 16 +- .../regionserver/ReplicationSourceService.java | 7 - .../replication/regionserver/Replication.java | 176 +++++---------------- .../regionserver/ReplicationSourceManager.java | 72 +++++++-- .../regionserver/ReplicationSyncUp.java | 3 +- .../org/apache/hadoop/hbase/util/HBaseFsck.java | 6 +- .../hadoop/hbase/wal/AbstractFSWALProvider.java | 43 +++-- .../hadoop/hbase/wal/DisabledWALProvider.java | 8 +- .../apache/hadoop/hbase/wal/FSHLogProvider.java | 10 +- .../hadoop/hbase/wal/RegionGroupingProvider.java | 35 ++-- .../org/apache/hadoop/hbase/wal/WALFactory.java | 42 ++--- .../org/apache/hadoop/hbase/wal/WALProvider.java | 46 ++++-- .../apache/hadoop/hbase/HBaseTestingUtility.java | 6 +- .../hadoop/hbase/coprocessor/TestWALObserver.java | 7 +- .../hbase/master/cleaner/TestLogsCleaner.java | 4 +- .../cleaner/TestReplicationHFileCleaner.java | 4 +- .../regionserver/TestCacheOnWriteInSchema.java | 2 +- .../TestCompactionArchiveConcurrentClose.java | 2 +- .../TestCompactionArchiveIOException.java | 2 +- .../hbase/regionserver/TestDefaultMemStore.java | 2 +- .../hadoop/hbase/regionserver/TestHMobStore.java | 2 +- .../hadoop/hbase/regionserver/TestHRegion.java | 23 ++- .../hadoop/hbase/regionserver/TestHStore.java | 2 +- .../regionserver/TestStoreFileRefresherChore.java | 2 +- .../TestWALMonotonicallyIncreasingSeqId.java | 2 +- .../regionserver/wal/AbstractTestLogRolling.java | 6 +- .../regionserver/wal/AbstractTestProtobufLog.java | 3 +- .../regionserver/wal/AbstractTestWALReplay.java | 11 +- .../hbase/regionserver/wal/TestDurability.java | 6 +- .../hbase/regionserver/wal/TestLogRollAbort.java | 2 +- .../regionserver/wal/TestLogRollingNoCluster.java | 2 +- .../regionserver/wal/TestWALActionsListener.java | 7 +- .../TestReplicationEmptyWALRecovery.java | 6 +- .../replication/TestReplicationSmallTests.java | 18 --- .../regionserver/TestReplicationSourceManager.java | 51 +++++- .../regionserver/TestWALEntryStream.java | 7 +- .../apache/hadoop/hbase/wal/IOTestProvider.java | 69 +++++--- .../wal/TestBoundedRegionGroupingStrategy.java | 2 +- .../hadoop/hbase/wal/TestFSHLogProvider.java | 6 +- .../org/apache/hadoop/hbase/wal/TestSecureWAL.java | 2 +- .../apache/hadoop/hbase/wal/TestWALFactory.java | 2 +- .../apache/hadoop/hbase/wal/TestWALMethods.java | 2 +- .../hadoop/hbase/wal/TestWALReaderOnSecureWAL.java | 7 +- .../apache/hadoop/hbase/wal/TestWALRootDir.java | 2 +- .../org/apache/hadoop/hbase/wal/TestWALSplit.java | 4 +- .../hadoop/hbase/wal/WALPerformanceEvaluation.java | 2 +- 51 files changed, 465 insertions(+), 474 deletions(-) diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java index 87d100b..e486714 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java @@ -127,7 +127,7 @@ public class TestWALRecordReader { */ @Test public void testPartialRead() throws Exception { - final WALFactory walfactory = new WALFactory(conf, null, getName()); + final WALFactory walfactory = new WALFactory(conf, getName()); WAL log = walfactory.getWAL(info); // This test depends on timestamp being millisecond based and the filename of the WAL also // being millisecond based. @@ -186,7 +186,7 @@ public class TestWALRecordReader { */ @Test public void testWALRecordReader() throws Exception { - final WALFactory walfactory = new WALFactory(conf, null, getName()); + final WALFactory walfactory = new WALFactory(conf, getName()); WAL log = walfactory.getWAL(info); byte [] value = Bytes.toBytes("value"); WALEdit edit = new WALEdit(); diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java index ebe68a7..11507aa 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java @@ -25,6 +25,7 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; @@ -122,4 +123,13 @@ public final class ReplicationUtils { isTableCFsEqual(rpc1.getTableCFsMap(), rpc2.getTableCFsMap()); } } + + /** + * @param c Configuration to look at + * @return True if replication for bulk load data is enabled. + */ + public static boolean isReplicationForBulkLoadDataEnabled(final Configuration c) { + return c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, + HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index d422960..dc1763c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.master; +import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS; + import com.google.protobuf.Descriptors; import com.google.protobuf.Service; import java.io.IOException; @@ -166,8 +168,10 @@ import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.ReplicationUtils; +import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; +import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader; -import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.trace.TraceUtil; @@ -484,7 +488,7 @@ public class HMaster extends HRegionServer implements MasterServices { // Disable usage of meta replicas in the master this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); - Replication.decorateMasterConfiguration(this.conf); + decorateMasterConfiguration(this.conf); // Hack! Maps DFSClient => Master for logs. HDFS made this // config param for task trackers, but we can piggyback off of it. @@ -3557,4 +3561,23 @@ public class HMaster extends HRegionServer implements MasterServices { public ReplicationPeerManager getReplicationPeerManager() { return replicationPeerManager; } + + /** + * This method modifies the master's configuration in order to inject replication-related features + */ + @VisibleForTesting + public static void decorateMasterConfiguration(Configuration conf) { + String plugins = conf.get(HBASE_MASTER_LOGCLEANER_PLUGINS); + String cleanerClass = ReplicationLogCleaner.class.getCanonicalName(); + if (!plugins.contains(cleanerClass)) { + conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass); + } + if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) { + plugins = conf.get(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS); + cleanerClass = ReplicationHFileCleaner.class.getCanonicalName(); + if (!plugins.contains(cleanerClass)) { + conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, plugins + "," + cleanerClass); + } + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index ecc9a74..7a6af75 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.regionserver; import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL; import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; + +import edu.umd.cs.findbugs.annotations.Nullable; import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; @@ -69,7 +71,6 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -148,28 +149,6 @@ import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.security.User; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; -import org.apache.hbase.thirdparty.com.google.common.collect.Maps; -import org.apache.hbase.thirdparty.com.google.common.io.Closeables; -import org.apache.hbase.thirdparty.com.google.protobuf.Service; -import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; -import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId; -import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.SnapshotManifest; import org.apache.hadoop.hbase.trace.TraceUtil; @@ -200,7 +179,29 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import edu.umd.cs.findbugs.annotations.Nullable; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hbase.thirdparty.com.google.common.collect.Maps; +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; +import org.apache.hbase.thirdparty.com.google.protobuf.Service; +import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; +import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; /** * Regions store data for a certain region of a table. It stores all columns diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 3844415..cb7e2d7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -131,10 +131,9 @@ import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler; import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler; import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; -import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; -import org.apache.hadoop.hbase.replication.regionserver.Replication; +import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; @@ -158,6 +157,7 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; @@ -546,7 +546,7 @@ public class HRegionServer extends HasThread implements checkCodecs(this.conf); this.userProvider = UserProvider.instantiate(conf); FSUtils.setupShortCircuitRead(this.conf); - Replication.decorateRegionServerConfiguration(this.conf); + decorateRegionServerConfiguration(this.conf); // Disable usage of meta replicas in the regionserver this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); @@ -1781,52 +1781,26 @@ public class HRegionServer extends HasThread implements } /** - * Setup WAL log and replication if enabled. - * Replication setup is done in here because it wants to be hooked up to WAL. - * - * @throws IOException + * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to + * be hooked up to WAL. */ private void setupWALAndReplication() throws IOException { + WALFactory factory = new WALFactory(conf, serverName.toString()); + // TODO Replication make assumptions here based on the default filesystem impl Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString()); Path logDir = new Path(walRootDir, logName); - if (LOG.isDebugEnabled()) LOG.debug("logDir=" + logDir); + LOG.debug("logDir={}", logDir); if (this.walFs.exists(logDir)) { - throw new RegionServerRunningException("Region server has already " + - "created directory at " + this.serverName.toString()); + throw new RegionServerRunningException( + "Region server has already created directory at " + this.serverName.toString()); } - - // Instantiate replication if replication enabled. Pass it the log directories. - // In here we create the Replication instances. Later they are initialized and started up. - createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir); - - // listeners the wal factory will add to wals it creates. - List listeners = new ArrayList<>(); - listeners.add(new MetricsWAL()); - if (this.replicationSourceHandler != null && - this.replicationSourceHandler.getWALActionsListener() != null) { - // Replication handler is an implementation of WALActionsListener. - listeners.add(this.replicationSourceHandler.getWALActionsListener()); - } - - // There is a cyclic dependency between ReplicationSourceHandler and WALFactory. - // We use WALActionsListener to get the newly rolled WALs, so we need to get the - // WALActionsListeners from ReplicationSourceHandler before constructing WALFactory. And then - // ReplicationSourceHandler need to use WALFactory get the length of the wal file being written. - // So we here we need to construct WALFactory first, and then pass it to the initialized method - // of ReplicationSourceHandler. - // TODO: I can't follow replication; it has initialize and then later on we start it! - WALFactory factory = new WALFactory(conf, listeners, serverName.toString()); + // Instantiate replication if replication enabled. Pass it the log directories. + createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, + factory.getWALProvider()); this.walFactory = factory; - if (this.replicationSourceHandler != null) { - this.replicationSourceHandler.initialize(this, walFs, logDir, oldLogDir, factory); - } - if (this.replicationSinkHandler != null && - this.replicationSinkHandler != this.replicationSourceHandler) { - this.replicationSinkHandler.initialize(this, walFs, logDir, oldLogDir, factory); - } } /** @@ -2918,15 +2892,13 @@ public class HRegionServer extends HasThread implements // // Main program and support routines // - /** * Load the replication executorService objects, if any */ private static void createNewReplicationInstance(Configuration conf, HRegionServer server, - FileSystem walFs, Path walDir, Path oldWALDir) throws IOException { - - if ((server instanceof HMaster) && (!LoadBalancer.isTablesOnMaster(conf) || - LoadBalancer.isSystemTablesOnlyOnMaster(conf))) { + FileSystem walFs, Path walDir, Path oldWALDir, WALProvider walProvider) throws IOException { + if ((server instanceof HMaster) && + (!LoadBalancer.isTablesOnMaster(conf) || LoadBalancer.isSystemTablesOnlyOnMaster(conf))) { return; } @@ -2941,32 +2913,30 @@ public class HRegionServer extends HasThread implements // If both the sink and the source class names are the same, then instantiate // only one object. if (sourceClassname.equals(sinkClassname)) { - server.replicationSourceHandler = - (ReplicationSourceService) newReplicationInstance(sourceClassname, conf, server, walFs, - walDir, oldWALDir); + server.replicationSourceHandler = newReplicationInstance(sourceClassname, + ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider); server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler; } else { - server.replicationSourceHandler = - (ReplicationSourceService) newReplicationInstance(sourceClassname, conf, server, walFs, - walDir, oldWALDir); - server.replicationSinkHandler = (ReplicationSinkService) newReplicationInstance(sinkClassname, - conf, server, walFs, walDir, oldWALDir); + server.replicationSourceHandler = newReplicationInstance(sourceClassname, + ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider); + server.replicationSinkHandler = newReplicationInstance(sinkClassname, + ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walProvider); } } - private static ReplicationService newReplicationInstance(String classname, Configuration conf, - HRegionServer server, FileSystem walFs, Path logDir, Path oldLogDir) throws IOException { - Class clazz = null; + private static T newReplicationInstance(String classname, + Class xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir, + Path oldLogDir, WALProvider walProvider) throws IOException { + Class clazz = null; try { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - clazz = Class.forName(classname, true, classLoader).asSubclass(ReplicationService.class); + clazz = Class.forName(classname, true, classLoader).asSubclass(xface); } catch (java.lang.ClassNotFoundException nfe) { throw new IOException("Could not find class for " + classname); } - - // create an instance of the replication object, but do not initialize it here as we need to use - // WALFactory when initializing. - return ReflectionUtils.newInstance(clazz, conf); + T service = ReflectionUtils.newInstance(clazz, conf); + service.initialize(server, walFs, logDir, oldLogDir, walProvider); + return service; } /** @@ -3739,4 +3709,20 @@ public class HRegionServer extends HasThread implements throw ProtobufUtil.getRemoteException(se); } } + + /** + * This method modifies the region server's configuration in order to inject replication-related + * features + * @param conf region server configurations + */ + static void decorateRegionServerConfiguration(Configuration conf) { + if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) { + String plugins = conf.get(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, ""); + String rsCoprocessorClass = ReplicationObserver.class.getCanonicalName(); + if (!plugins.contains(rsCoprocessorClass)) { + conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, + plugins + "," + rsCoprocessorClass); + } + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java index f3bc188..c34231d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,13 +18,12 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; - import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Server; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; -import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; +import org.apache.hadoop.hbase.wal.WALProvider; +import org.apache.yetus.audience.InterfaceAudience; /** * Gateway to Cluster Replication. Used by @@ -37,14 +35,14 @@ public interface ReplicationService { /** * Initializes the replication service object. - * @throws IOException + * @param walProvider can be null if not initialized inside a live region server environment, for + * example, {@code ReplicationSyncUp}. */ - void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir, - WALFileLengthProvider walFileLengthProvider) throws IOException; + void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir, WALProvider walProvider) + throws IOException; /** * Start replication services. - * @throws IOException */ void startReplicationService() throws IOException; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java index 2aef0a8..23ba773 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.hbase.replication.regionserver.PeerProcedureHandler; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; /** * A source for a replication stream has to expose this service. @@ -28,12 +27,6 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; */ @InterfaceAudience.Private public interface ReplicationSourceService extends ReplicationService { - /** - * Returns a WALObserver for the service. This is needed to - * observe log rolls and log archival events. - */ - WALActionsListener getWALActionsListener(); - /** * Returns a Handler to handle peer procedures. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 2fa5a9b..aaf3beb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hbase.replication.regionserver; -import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS; - import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -30,14 +28,10 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; -import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; import org.apache.hadoop.hbase.regionserver.ReplicationSourceService; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; @@ -46,12 +40,11 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationTracker; -import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; -import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; +import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKey; -import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; @@ -61,16 +54,15 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; + /** - * Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}. + * Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}. */ @InterfaceAudience.Private -public class Replication implements - ReplicationSourceService, ReplicationSinkService, WALActionsListener { +public class Replication implements ReplicationSourceService, ReplicationSinkService { private static final Logger LOG = LoggerFactory.getLogger(Replication.class); - private boolean replicationForBulkLoadData; + private boolean isReplicationForBulkLoadDataEnabled; private ReplicationSourceManager replicationManager; private ReplicationQueueStorage queueStorage; private ReplicationPeers replicationPeers; @@ -88,18 +80,6 @@ public class Replication implements private PeerProcedureHandler peerProcedureHandler; /** - * Instantiate the replication management (if rep is enabled). - * @param server Hosting server - * @param fs handle to the filesystem - * @param logDir - * @param oldLogDir directory where logs are archived - * @throws IOException - */ - public Replication(Server server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException { - initialize(server, fs, logDir, oldLogDir, p -> OptionalLong.empty()); - } - - /** * Empty constructor */ public Replication() { @@ -107,16 +87,17 @@ public class Replication implements @Override public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir, - WALFileLengthProvider walFileLengthProvider) throws IOException { + WALProvider walProvider) throws IOException { this.server = server; this.conf = this.server.getConfiguration(); - this.replicationForBulkLoadData = isReplicationForBulkLoadDataEnabled(this.conf); + this.isReplicationForBulkLoadDataEnabled = + ReplicationUtils.isReplicationForBulkLoadDataEnabled(this.conf); this.scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() .setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d") .setDaemon(true) .build()); - if (this.replicationForBulkLoadData) { + if (this.isReplicationForBulkLoadDataEnabled) { if (conf.get(HConstants.REPLICATION_CLUSTER_ID) == null || conf.get(HConstants.REPLICATION_CLUSTER_ID).isEmpty()) { throw new IllegalArgumentException(HConstants.REPLICATION_CLUSTER_ID @@ -142,9 +123,28 @@ public class Replication implements } catch (KeeperException ke) { throw new IOException("Could not read cluster id", ke); } - this.replicationManager = - new ReplicationSourceManager(queueStorage, replicationPeers, replicationTracker, conf, - this.server, fs, logDir, oldLogDir, clusterId, walFileLengthProvider); + this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, + replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId, + walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty()); + if (walProvider != null) { + walProvider.addWALActionsListener(new WALActionsListener() { + + @Override + public void preLogRoll(Path oldPath, Path newPath) throws IOException { + replicationManager.preLogRoll(newPath); + } + + @Override + public void postLogRoll(Path oldPath, Path newPath) throws IOException { + replicationManager.postLogRoll(newPath); + } + + @Override + public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException { + replicationManager.scopeWALEdits(logKey, logEdit); + } + }); + } this.statsThreadPeriod = this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod); @@ -153,23 +153,6 @@ public class Replication implements this.peerProcedureHandler = new PeerProcedureHandlerImpl(replicationManager); } - /** - * @param c Configuration to look at - * @return True if replication for bulk load data is enabled. - */ - public static boolean isReplicationForBulkLoadDataEnabled(final Configuration c) { - return c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, - HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); - } - - /* - * Returns an object to listen to new wal changes - **/ - @Override - public WALActionsListener getWALActionsListener() { - return this; - } - @Override public PeerProcedureHandler getPeerProcedureHandler() { return peerProcedureHandler; @@ -225,7 +208,7 @@ public class Replication implements this.replicationManager.init(); this.replicationSink = new ReplicationSink(this.conf, this.server); this.scheduleThreadPool.scheduleAtFixedRate( - new ReplicationStatisticsThread(this.replicationSink, this.replicationManager), + new ReplicationStatisticsTask(this.replicationSink, this.replicationManager), statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS); } @@ -237,45 +220,6 @@ public class Replication implements return this.replicationManager; } - @Override - public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException { - scopeWALEdits(logKey, logEdit, this.conf, this.getReplicationManager()); - } - - /** - * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from - * compaction WAL edits and if the scope is local. - * @param logKey Key that may get scoped according to its edits - * @param logEdit Edits used to lookup the scopes - * @param replicationManager Manager used to add bulk load events hfile references - * @throws IOException If failed to parse the WALEdit - */ - public static void scopeWALEdits(WALKey logKey, - WALEdit logEdit, Configuration conf, ReplicationSourceManager replicationManager) - throws IOException { - boolean replicationForBulkLoadEnabled = isReplicationForBulkLoadDataEnabled(conf); - boolean foundOtherEdits = false; - for (Cell cell : logEdit.getCells()) { - if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { - foundOtherEdits = true; - break; - } - } - - if (!foundOtherEdits && logEdit.getCells().size() > 0) { - WALProtos.RegionEventDescriptor maybeEvent = - WALEdit.getRegionEventDescriptor(logEdit.getCells().get(0)); - if (maybeEvent != null && (maybeEvent.getEventType() == - WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE)) { - // In serially replication, we use scopes when reading close marker. - foundOtherEdits = true; - } - } - if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || logEdit.isReplay()) { - ((WALKeyImpl)logKey).serializeReplicationScope(false); - } - } - void addHFileRefsToQueue(TableName tableName, byte[] family, List> pairs) throws IOException { try { @@ -286,62 +230,16 @@ public class Replication implements } } - @Override - public void preLogRoll(Path oldPath, Path newPath) throws IOException { - getReplicationManager().preLogRoll(newPath); - } - - @Override - public void postLogRoll(Path oldPath, Path newPath) throws IOException { - getReplicationManager().postLogRoll(newPath); - } - /** - * This method modifies the master's configuration in order to inject replication-related features - * @param conf - */ - public static void decorateMasterConfiguration(Configuration conf) { - String plugins = conf.get(HBASE_MASTER_LOGCLEANER_PLUGINS); - String cleanerClass = ReplicationLogCleaner.class.getCanonicalName(); - if (!plugins.contains(cleanerClass)) { - conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass); - } - if (isReplicationForBulkLoadDataEnabled(conf)) { - plugins = conf.get(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS); - cleanerClass = ReplicationHFileCleaner.class.getCanonicalName(); - if (!plugins.contains(cleanerClass)) { - conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, plugins + "," + cleanerClass); - } - } - } - - /** - * This method modifies the region server's configuration in order to inject replication-related - * features - * @param conf region server configurations - */ - public static void decorateRegionServerConfiguration(Configuration conf) { - if (isReplicationForBulkLoadDataEnabled(conf)) { - String plugins = conf.get(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, ""); - String rsCoprocessorClass = ReplicationObserver.class.getCanonicalName(); - if (!plugins.contains(rsCoprocessorClass)) { - conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, - plugins + "," + rsCoprocessorClass); - } - } - } - - /* - * Statistics thread. Periodically prints the cache statistics to the log. + * Statistics task. Periodically prints the cache statistics to the log. */ - static class ReplicationStatisticsThread extends Thread { + private final static class ReplicationStatisticsTask implements Runnable { private final ReplicationSink replicationSink; private final ReplicationSourceManager replicationManager; - public ReplicationStatisticsThread(final ReplicationSink replicationSink, - final ReplicationSourceManager replicationManager) { - super("ReplicationStatisticsThread"); + public ReplicationStatisticsTask(ReplicationSink replicationSink, + ReplicationSourceManager replicationManager) { this.replicationManager = replicationManager; this.replicationSink = replicationSink; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index ab86d7c..6a4e03e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -43,6 +43,8 @@ import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.Server; @@ -57,9 +59,13 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationTracker; +import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,6 +73,8 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; + /** * This class is responsible to manage all the replication sources. There are two classes of * sources: @@ -86,14 +94,15 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto * operations. *
  • Need synchronized on {@link #walsById}. There are four methods which modify it, * {@link #addPeer(String)}, {@link #removePeer(String)}, - * {@link #cleanOldLogs(SortedSet, String, String)} and {@link #preLogRoll(Path)}. {@link #walsById} - * is a ConcurrentHashMap and there is a Lock for peer id in {@link PeerProcedureHandlerImpl}. So - * there is no race between {@link #addPeer(String)} and {@link #removePeer(String)}. - * {@link #cleanOldLogs(SortedSet, String, String)} is called by {@link ReplicationSourceInterface}. - * So no race with {@link #addPeer(String)}. {@link #removePeer(String)} will terminate the - * {@link ReplicationSourceInterface} firstly, then remove the wals from {@link #walsById}. So no - * race with {@link #removePeer(String)}. The only case need synchronized is - * {@link #cleanOldLogs(SortedSet, String, String)} and {@link #preLogRoll(Path)}.
  • + * {@link #cleanOldLogs(SortedSet, String, String)} and {@link #preLogRoll(Path, Path)}. + * {@link #walsById} is a ConcurrentHashMap and there is a Lock for peer id in + * {@link PeerProcedureHandlerImpl}. So there is no race between {@link #addPeer(String)} and + * {@link #removePeer(String)}. {@link #cleanOldLogs(SortedSet, String, String)} is called by + * {@link ReplicationSourceInterface}. So no race with {@link #addPeer(String)}. + * {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then + * remove the wals from {@link #walsById}. So no race with {@link #removePeer(String)}. The only + * case need synchronized is {@link #cleanOldLogs(SortedSet, String, String)} and + * {@link #preLogRoll(Path, Path)}. *
  • No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which * modify it, {@link #removePeer(String)} , {@link #cleanOldLogs(SortedSet, String, String)} and * {@link ReplicationSourceManager.NodeFailoverWorker#run()}. @@ -533,7 +542,9 @@ public class ReplicationSourceManager implements ReplicationListener { walSet.clear(); } - void preLogRoll(Path newLog) throws IOException { + // public because of we call it in TestReplicationEmptyWALRecovery + @VisibleForTesting + public void preLogRoll(Path newLog) throws IOException { String logName = newLog.getName(); String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName); // synchronized on latestPaths to avoid the new open source miss the new log @@ -588,13 +599,52 @@ public class ReplicationSourceManager implements ReplicationListener { } } - void postLogRoll(Path newLog) throws IOException { + // public because of we call it in TestReplicationEmptyWALRecovery + @VisibleForTesting + public void postLogRoll(Path newLog) throws IOException { // This only updates the sources we own, not the recovered ones for (ReplicationSourceInterface source : this.sources.values()) { source.enqueueLog(newLog); } } + void scopeWALEdits(WALKey logKey, WALEdit logEdit) throws IOException { + scopeWALEdits(logKey, logEdit, this.conf); + } + + /** + * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from + * compaction WAL edits and if the scope is local. + * @param logKey Key that may get scoped according to its edits + * @param logEdit Edits used to lookup the scopes + * @throws IOException If failed to parse the WALEdit + */ + @VisibleForTesting + static void scopeWALEdits(WALKey logKey, WALEdit logEdit, Configuration conf) throws IOException { + boolean replicationForBulkLoadEnabled = + ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf); + boolean foundOtherEdits = false; + for (Cell cell : logEdit.getCells()) { + if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { + foundOtherEdits = true; + break; + } + } + + if (!foundOtherEdits && logEdit.getCells().size() > 0) { + WALProtos.RegionEventDescriptor maybeEvent = + WALEdit.getRegionEventDescriptor(logEdit.getCells().get(0)); + if (maybeEvent != null && + (maybeEvent.getEventType() == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE)) { + // In serially replication, we use scopes when reading close marker. + foundOtherEdits = true; + } + } + if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || logEdit.isReplay()) { + ((WALKeyImpl) logKey).serializeReplicationScope(false); + } + } + @Override public void regionServerRemoved(String regionserver) { transferQueues(ServerName.valueOf(regionserver)); @@ -886,7 +936,6 @@ public class ReplicationSourceManager implements ReplicationListener { */ void waitUntilCanBePushed(byte[] encodedName, long seq, String peerId) throws IOException, InterruptedException { - /** * There are barriers for this region and position for this peer. N barriers form N intervals, * (b1,b2) (b2,b3) ... (bn,max). Generally, there is no logs whose seq id is not greater than @@ -974,5 +1023,4 @@ public class ReplicationSourceManager implements ReplicationListener { Thread.sleep(replicationWaitTime); } } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java index 9ec244a..01a230d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java @@ -99,7 +99,8 @@ public class ReplicationSyncUp extends Configured implements Tool { logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME); System.out.println("Start Replication Server start"); - replication = new Replication(new DummyServer(zkw), fs, logDir, oldLogDir); + replication = new Replication(); + replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir, null); manager = replication.getReplicationManager(); manager.init().get(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index b4a22e4..361bb51 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -114,8 +114,6 @@ import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; -import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.UserProvider; @@ -1482,9 +1480,7 @@ public class HBaseFsck extends Configured implements Closeable { // unless I pass along via the conf. Configuration confForWAL = new Configuration(c); confForWAL.set(HConstants.HBASE_DIR, rootdir.toString()); - WAL wal = - new WALFactory(confForWAL, Collections. singletonList(new MetricsWAL()), - walFactoryID).getWAL(metaHRI); + WAL wal = new WALFactory(confForWAL, walFactoryID).getWAL(metaHRI); HRegion meta = HRegion.createHRegion(metaHRI, rootdir, c, metaDescriptor, wal); MasterFileSystem.setInfoFamilyCachingForMeta(metaDescriptor, true); return meta; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index d9badfa..231afd5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -76,13 +76,13 @@ public abstract class AbstractFSWALProvider> implemen } protected volatile T wal; - protected WALFactory factory = null; - protected Configuration conf = null; - protected List listeners = null; - protected String providerId = null; + protected WALFactory factory; + protected Configuration conf; + protected List listeners = new ArrayList<>(); + protected String providerId; protected AtomicBoolean initialized = new AtomicBoolean(false); // for default wal provider, logPrefix won't change - protected String logPrefix = null; + protected String logPrefix; /** * we synchronized on walCreateLock to prevent wal recreation in different threads @@ -92,19 +92,16 @@ public abstract class AbstractFSWALProvider> implemen /** * @param factory factory that made us, identity used for FS layout. may not be null * @param conf may not be null - * @param listeners may be null * @param providerId differentiate between providers from one factory, used for FS layout. may be * null */ @Override - public void init(WALFactory factory, Configuration conf, List listeners, - String providerId) throws IOException { + public void init(WALFactory factory, Configuration conf, String providerId) throws IOException { if (!initialized.compareAndSet(false, true)) { throw new IllegalStateException("WALProvider.init should only be called once."); } this.factory = factory; this.conf = conf; - this.listeners = listeners; this.providerId = providerId; // get log prefix StringBuilder sb = new StringBuilder().append(factory.factoryId); @@ -249,8 +246,8 @@ public abstract class AbstractFSWALProvider> implemen * Pattern used to validate a WAL file name see {@link #validateWALFilename(String)} for * description. */ - private static final Pattern pattern = Pattern - .compile(".*\\.\\d*(" + META_WAL_PROVIDER_ID + ")*"); + private static final Pattern pattern = + Pattern.compile(".*\\.\\d*(" + META_WAL_PROVIDER_ID + ")*"); /** * A WAL file name is of the format: <wal-name>{@link #WAL_FILE_NAME_DELIMITER} @@ -264,8 +261,8 @@ public abstract class AbstractFSWALProvider> implemen } /** - * Construct the directory name for all WALs on a given server. Dir names currently look like - * this for WALs: hbase//WALs/kalashnikov.att.net,61634,1486865297088. + * Construct the directory name for all WALs on a given server. Dir names currently look like this + * for WALs: hbase//WALs/kalashnikov.att.net,61634,1486865297088. * @param serverName Server name formatted as described in {@link ServerName} * @return the relative WAL directory name, e.g. .logs/1.example.org,60030,12345 if * serverName passed is 1.example.org,60030,12345 @@ -278,9 +275,9 @@ public abstract class AbstractFSWALProvider> implemen } /** - * Construct the directory name for all old WALs on a given server. The default old WALs dir - * looks like: hbase/oldWALs. If you config hbase.separate.oldlogdir.by.regionserver - * to true, it looks like hbase//oldWALs/kalashnikov.att.net,61634,1486865297088. + * Construct the directory name for all old WALs on a given server. The default old WALs dir looks + * like: hbase/oldWALs. If you config hbase.separate.oldlogdir.by.regionserver to + * true, it looks like hbase//oldWALs/kalashnikov.att.net,61634,1486865297088. * @param conf * @param serverName Server name formatted as described in {@link ServerName} * @return the relative WAL directory name @@ -372,7 +369,7 @@ public abstract class AbstractFSWALProvider> implemen } try { serverName = ServerName.parseServerName(logDirName); - } catch (IllegalArgumentException|IllegalStateException ex) { + } catch (IllegalArgumentException | IllegalStateException ex) { serverName = null; LOG.warn("Cannot parse a server name from path=" + logFile + "; " + ex.getMessage()); } @@ -430,16 +427,14 @@ public abstract class AbstractFSWALProvider> implemen } /** - * Opens WAL reader with retries and - * additional exception handling + * Opens WAL reader with retries and additional exception handling * @param path path to WAL file * @param conf configuration * @return WAL Reader instance * @throws IOException */ - public static org.apache.hadoop.hbase.wal.WAL.Reader - openReader(Path path, Configuration conf) - throws IOException + public static org.apache.hadoop.hbase.wal.WAL.Reader openReader(Path path, Configuration conf) + throws IOException { long retryInterval = 2000; // 2 sec @@ -503,6 +498,10 @@ public abstract class AbstractFSWALProvider> implemen } } + @Override + public void addWALActionsListener(WALActionsListener listener) { + listeners.add(listener); + } /** * Get prefix of the log from its name, assuming WAL name in format of diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index 2105490..1e750e2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -55,8 +55,7 @@ class DisabledWALProvider implements WALProvider { WAL disabled; @Override - public void init(final WALFactory factory, final Configuration conf, - final List listeners, String providerId) throws IOException { + public void init(WALFactory factory, Configuration conf, String providerId) throws IOException { if (null != disabled) { throw new IllegalStateException("WALProvider.init should only be called once."); } @@ -250,4 +249,9 @@ class DisabledWALProvider implements WALProvider { public long getLogFileSize() { return 0; } + + @Override + public void addWALActionsListener(WALActionsListener listener) { + disabled.registerWALActionsListener(listener); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java index f1662bc..b0a924f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java @@ -19,19 +19,17 @@ package org.apache.hadoop.hbase.wal; import java.io.IOException; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.wal.FSHLog; +import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -// imports for things that haven't moved from regionserver.wal yet. -import org.apache.hadoop.hbase.regionserver.wal.FSHLog; -import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; -import org.apache.hadoop.hbase.util.CommonFSUtils; /** * A WAL provider that use {@link FSHLog}. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java index a0ef817..28817e9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java @@ -23,11 +23,11 @@ import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DE import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.Lock; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.RegionInfo; @@ -130,20 +130,18 @@ public class RegionGroupingProvider implements WALProvider { private final KeyLocker createLock = new KeyLocker<>(); - private RegionGroupingStrategy strategy = null; - private WALFactory factory = null; - private List listeners = null; - private String providerId = null; + private RegionGroupingStrategy strategy; + private WALFactory factory; + private List listeners = new ArrayList<>(); + private String providerId; private Class providerClass; @Override - public void init(final WALFactory factory, final Configuration conf, - final List listeners, final String providerId) throws IOException { + public void init(WALFactory factory, Configuration conf, String providerId) throws IOException { if (null != strategy) { throw new IllegalStateException("WALProvider.init should only be called once."); } this.factory = factory; - this.listeners = null == listeners ? null : Collections.unmodifiableList(listeners); StringBuilder sb = new StringBuilder().append(factory.factoryId); if (providerId != null) { if (providerId.startsWith(WAL_FILE_NAME_DELIMITER)) { @@ -159,19 +157,15 @@ public class RegionGroupingProvider implements WALProvider { private WALProvider createProvider(String group) throws IOException { if (META_WAL_PROVIDER_ID.equals(providerId)) { - return factory.createProvider(providerClass, listeners, META_WAL_PROVIDER_ID); + return factory.createProvider(providerClass, META_WAL_PROVIDER_ID); } else { - return factory.createProvider(providerClass, listeners, group); + return factory.createProvider(providerClass, group); } } @Override public List getWALs() { - List wals = new ArrayList<>(); - for (WALProvider provider : cached.values()) { - wals.addAll(provider.getWALs()); - } - return wals; + return cached.values().stream().flatMap(p -> p.getWALs().stream()).collect(Collectors.toList()); } private WAL getWAL(String group) throws IOException { @@ -182,6 +176,7 @@ public class RegionGroupingProvider implements WALProvider { provider = cached.get(group); if (provider == null) { provider = createProvider(group); + listeners.forEach(provider::addWALActionsListener); cached.put(group, provider); } } finally { @@ -277,4 +272,14 @@ public class RegionGroupingProvider implements WALProvider { } return logFileSize; } + + @Override + public void addWALActionsListener(WALActionsListener listener) { + // Notice that there is an assumption that this method must be called before the getWAL above, + // so we can make sure there is no sub WALProvider yet, so we only add the listener to our + // listeners list without calling addWALActionListener for each WALProvider. Although it is no + // hurt to execute an extra loop to call addWALActionListener for each WALProvider, but if the + // extra code actually works, then we will have other big problems. So leave it as is. + listeners.add(listener); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java index d59c824..1410b53 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java @@ -19,19 +19,14 @@ package org.apache.hadoop.hbase.wal; import java.io.IOException; import java.io.InterruptedIOException; -import java.util.Collections; import java.util.List; -import java.util.OptionalLong; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.RegionInfo; -// imports for things that haven't moved from regionserver.wal yet. import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; -import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; @@ -62,7 +57,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti * Alternatively, you may provide a custom implementation of {@link WALProvider} by class name. */ @InterfaceAudience.Private -public class WALFactory implements WALFileLengthProvider { +public class WALFactory { private static final Logger LOG = LoggerFactory.getLogger(WALFactory.class); @@ -135,12 +130,12 @@ public class WALFactory implements WALFileLengthProvider { } } - WALProvider createProvider(Class clazz, - List listeners, String providerId) throws IOException { + WALProvider createProvider(Class clazz, String providerId) + throws IOException { LOG.info("Instantiating WALProvider of type " + clazz); try { final WALProvider result = clazz.getDeclaredConstructor().newInstance(); - result.init(this, conf, listeners, providerId); + result.init(this, conf, providerId); return result; } catch (Exception e) { LOG.error("couldn't set up WALProvider, the configured class is " + clazz); @@ -150,24 +145,23 @@ public class WALFactory implements WALFileLengthProvider { } /** - * instantiate a provider from a config property. - * requires conf to have already been set (as well as anything the provider might need to read). + * instantiate a provider from a config property. requires conf to have already been set (as well + * as anything the provider might need to read). */ - WALProvider getProvider(final String key, final String defaultValue, - final List listeners, final String providerId) throws IOException { + WALProvider getProvider(String key, String defaultValue, String providerId) throws IOException { Class clazz = getProviderClass(key, defaultValue); - return createProvider(clazz, listeners, providerId); + WALProvider provider = createProvider(clazz, providerId); + provider.addWALActionsListener(new MetricsWAL()); + return provider; } /** * @param conf must not be null, will keep a reference to read params in later reader/writer - * instances. - * @param listeners may be null. will be given to all created wals (and not meta-wals) + * instances. * @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations - * to make a directory + * to make a directory */ - public WALFactory(final Configuration conf, final List listeners, - final String factoryId) throws IOException { + public WALFactory(Configuration conf, String factoryId) throws IOException { // until we've moved reader/writer construction down into providers, this initialization must // happen prior to provider initialization, in case they need to instantiate a reader/writer. timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); @@ -178,12 +172,12 @@ public class WALFactory implements WALFileLengthProvider { this.factoryId = factoryId; // end required early initialization if (conf.getBoolean("hbase.regionserver.hlog.enabled", true)) { - provider = getProvider(WAL_PROVIDER, DEFAULT_WAL_PROVIDER, listeners, null); + provider = getProvider(WAL_PROVIDER, DEFAULT_WAL_PROVIDER, null); } else { // special handling of existing configuration behavior. LOG.warn("Running with WAL disabled."); provider = new DisabledWALProvider(); - provider.init(this, conf, null, factoryId); + provider.init(this, conf, factoryId); } } @@ -236,7 +230,6 @@ public class WALFactory implements WALFileLengthProvider { return provider; } provider = getProvider(META_WAL_PROVIDER, DEFAULT_META_WAL_PROVIDER, - Collections. singletonList(new MetricsWAL()), AbstractFSWALProvider.META_WAL_PROVIDER_ID); if (metaProvider.compareAndSet(null, provider)) { return provider; @@ -448,9 +441,4 @@ public class WALFactory implements WALFileLengthProvider { public final WALProvider getMetaWALProvider() { return this.metaProvider.get(); } - - @Override - public OptionalLong getLogFileSizeIfBeingWritten(Path path) { - return getWALs().stream().map(w -> w.getLogFileSizeIfBeingWritten(path)).filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty()); - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java index 0586d1d..7ad815e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java @@ -21,34 +21,31 @@ package org.apache.hadoop.hbase.wal; import java.io.Closeable; import java.io.IOException; import java.util.List; +import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; import org.apache.yetus.audience.InterfaceAudience; /** - * The Write Ahead Log (WAL) stores all durable edits to the HRegion. - * This interface provides the entry point for all WAL implementors. + * The Write Ahead Log (WAL) stores all durable edits to the HRegion. This interface provides the + * entry point for all WAL implementors. *

    - * See {@link FSHLogProvider} for an example implementation. - * - * A single WALProvider will be used for retrieving multiple WALs in a particular region server - * and must be threadsafe. + * See {@link FSHLogProvider} for an example implementation. A single WALProvider will be used for + * retrieving multiple WALs in a particular region server and must be threadsafe. */ @InterfaceAudience.Private public interface WALProvider { /** - * Set up the provider to create wals. - * will only be called once per instance. + * Set up the provider to create wals. will only be called once per instance. * @param factory factory that made us may not be null * @param conf may not be null - * @param listeners may be null * @param providerId differentiate between providers from one factory. may be null */ - void init(WALFactory factory, Configuration conf, List listeners, - String providerId) throws IOException; + void init(WALFactory factory, Configuration conf, String providerId) throws IOException; /** * @param region the region which we want to get a WAL for it. Could be null. @@ -62,16 +59,16 @@ public interface WALProvider { List getWALs(); /** - * persist outstanding WALs to storage and stop accepting new appends. - * This method serves as shorthand for sending a sync to every WAL provided by a given - * implementation. Those WALs will also stop accepting new writes. + * persist outstanding WALs to storage and stop accepting new appends. This method serves as + * shorthand for sending a sync to every WAL provided by a given implementation. Those WALs will + * also stop accepting new writes. */ void shutdown() throws IOException; /** - * shutdown utstanding WALs and clean up any persisted state. - * Call this method only when you will not need to replay any of the edits to the WALs from - * this provider. After this call completes, the underlying resources should have been reclaimed. + * shutdown utstanding WALs and clean up any persisted state. Call this method only when you will + * not need to replay any of the edits to the WALs from this provider. After this call completes, + * the underlying resources should have been reclaimed. */ void close() throws IOException; @@ -83,11 +80,13 @@ public interface WALProvider { // interface provided by WAL. interface Writer extends WriterBase { void sync() throws IOException; + void append(WAL.Entry entry) throws IOException; } interface AsyncWriter extends WriterBase { CompletableFuture sync(); + void append(WAL.Entry entry); } @@ -101,4 +100,17 @@ public interface WALProvider { */ long getLogFileSize(); + /** + * Add a {@link WALActionsListener}. + *

    + * Notice that you must call this method before calling {@link #getWAL(RegionInfo)} as this method + * will not effect the {@link WAL} which has already been created. And as long as we can only it + * when initialization, it is not thread safe. + */ + void addWALActionsListener(WALActionsListener listener); + + default WALFileLengthProvider getWALFileLengthProvider() { + return path -> getWALs().stream().map(w -> w.getLogFileSizeIfBeingWritten(path)) + .filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty()); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 75abd5e..658fe4f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -112,8 +112,6 @@ import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; -import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.security.HBaseKerberosUtils; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache; @@ -2310,9 +2308,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility { // unless I pass along via the conf. Configuration confForWAL = new Configuration(conf); confForWAL.set(HConstants.HBASE_DIR, rootDir.toString()); - return (new WALFactory(confForWAL, - Collections. singletonList(new MetricsWAL()), - "hregion-" + RandomStringUtils.randomNumeric(8))).getWAL(hri); + return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8)).getWAL(hri); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java index df80fa0..3ee7020 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java @@ -155,7 +155,7 @@ public class TestWALObserver { if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseWALRootDir)) { TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseWALRootDir, true); } - this.wals = new WALFactory(conf, null, serverName); + this.wals = new WALFactory(conf, serverName); } @After @@ -353,8 +353,9 @@ public class TestWALObserver { Path p = runWALSplit(newConf); LOG.info("WALSplit path == " + p); // Make a new wal for new region open. - final WALFactory wals2 = new WALFactory(conf, null, - ServerName.valueOf(currentTest.getMethodName()+"2", 16010, System.currentTimeMillis()).toString()); + final WALFactory wals2 = new WALFactory(conf, + ServerName.valueOf(currentTest.getMethodName() + "2", 16010, System.currentTimeMillis()) + .toString()); WAL wal2 = wals2.getWAL(null); HRegion region = HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir, hri, htd, wal2, TEST_UTIL.getHBaseCluster().getRegionServer(0), null); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index d30fe9f..2f518c7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -45,10 +45,10 @@ import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; -import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; @@ -114,7 +114,7 @@ public class TestLogsCleaner { conf.setLong("hbase.master.logcleaner.ttl", ttlWAL); conf.setLong("hbase.master.procedurewalcleaner.ttl", ttlProcedureWAL); - Replication.decorateMasterConfiguration(conf); + HMaster.decorateMasterConfiguration(conf); Server server = new DummyServer(); ReplicationQueueStorage queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java index d3385e7..08dd428 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; @@ -50,7 +51,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; -import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Pair; @@ -93,7 +93,7 @@ public class TestReplicationHFileCleaner { TEST_UTIL.startMiniZKCluster(); server = new DummyServer(); conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); - Replication.decorateMasterConfiguration(conf); + HMaster.decorateMasterConfiguration(conf); rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf); rp.init(); rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java index 5792106..f26998b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java @@ -178,7 +178,7 @@ public class TestCacheOnWriteInSchema { fs.delete(logdir, true); RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); - walFactory = new WALFactory(conf, null, id); + walFactory = new WALFactory(conf, id); region = TEST_UTIL.createLocalHRegion(info, htd, walFactory.getWAL(info)); store = new HStore(region, hcd, conf); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java index aa5365c..225c723 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveConcurrentClose.java @@ -175,7 +175,7 @@ public class TestCompactionArchiveConcurrentClose { ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); final Configuration walConf = new Configuration(conf); FSUtils.setRootDir(walConf, tableDir); - final WALFactory wals = new WALFactory(walConf, null, "log_" + info.getEncodedName()); + final WALFactory wals = new WALFactory(walConf, "log_" + info.getEncodedName()); HRegion region = new HRegion(fs, wals.getWAL(info), conf, htd, null); region.initialize(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java index b8780af..4c6cf6a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionArchiveIOException.java @@ -196,7 +196,7 @@ public class TestCompactionArchiveIOException { HRegionFileSystem fs = new HRegionFileSystem(conf, errFS, tableDir, info); final Configuration walConf = new Configuration(conf); FSUtils.setRootDir(walConf, tableDir); - final WALFactory wals = new WALFactory(walConf, null, "log_" + info.getEncodedName()); + final WALFactory wals = new WALFactory(walConf, "log_" + info.getEncodedName()); HRegion region = new HRegion(fs, wals.getWAL(info), conf, htd, null); region.initialize(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index cd8539a..b7e0164 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -946,7 +946,7 @@ public class TestDefaultMemStore { EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest(); EnvironmentEdgeManager.injectEdge(edge); edge.setCurrentTimeMillis(1234); - WALFactory wFactory = new WALFactory(conf, null, "1234"); + WALFactory wFactory = new WALFactory(conf, "1234"); HRegion meta = HRegion.createHRegion(RegionInfoBuilder.FIRST_META_REGIONINFO, testDir, conf, FSTableDescriptors.createMetaTableDescriptor(conf), wFactory.getWAL(RegionInfoBuilder.FIRST_META_REGIONINFO)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java index e8dbdac..9c5a667 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java @@ -165,7 +165,7 @@ public class TestHMobStore { ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); final Configuration walConf = new Configuration(conf); FSUtils.setRootDir(walConf, basedir); - final WALFactory wals = new WALFactory(walConf, null, methodName); + final WALFactory wals = new WALFactory(walConf, methodName); region = new HRegion(tableDir, wals.getWAL(info), fs, conf, info, htd, null); store = new HMobStore(region, hcd, conf); if(testStore) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 0d9db31..88e1aa2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -47,7 +47,6 @@ import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -139,9 +138,7 @@ import org.apache.hadoop.hbase.regionserver.Region.RowLock; import org.apache.hadoop.hbase.regionserver.TestHStore.FaultyFileSystem; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; import org.apache.hadoop.hbase.regionserver.wal.FSHLog; -import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource; -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.test.MetricsAssertHelper; @@ -381,8 +378,8 @@ public class TestHRegion { final Path logDir = TEST_UTIL.getDataTestDirOnTestFS(callingMethod + ".log"); final Configuration walConf = new Configuration(conf); FSUtils.setRootDir(walConf, logDir); - return new WALFactory(walConf, Collections. singletonList(new MetricsWAL()), - callingMethod).getWAL(RegionInfoBuilder.newBuilder(tableName).build()); + return new WALFactory(walConf, callingMethod) + .getWAL(RegionInfoBuilder.newBuilder(tableName).build()); } @Test @@ -642,7 +639,7 @@ public class TestHRegion { public void testSkipRecoveredEditsReplay() throws Exception { byte[] family = Bytes.toBytes("family"); this.region = initHRegion(tableName, method, CONF, family); - final WALFactory wals = new WALFactory(CONF, null, method); + final WALFactory wals = new WALFactory(CONF, method); try { Path regiondir = region.getRegionFileSystem().getRegionDir(); FileSystem fs = region.getRegionFileSystem().getFileSystem(); @@ -693,7 +690,7 @@ public class TestHRegion { public void testSkipRecoveredEditsReplaySomeIgnored() throws Exception { byte[] family = Bytes.toBytes("family"); this.region = initHRegion(tableName, method, CONF, family); - final WALFactory wals = new WALFactory(CONF, null, method); + final WALFactory wals = new WALFactory(CONF, method); try { Path regiondir = region.getRegionFileSystem().getRegionDir(); FileSystem fs = region.getRegionFileSystem().getFileSystem(); @@ -781,7 +778,7 @@ public class TestHRegion { public void testSkipRecoveredEditsReplayTheLastFileIgnored() throws Exception { byte[] family = Bytes.toBytes("family"); this.region = initHRegion(tableName, method, CONF, family); - final WALFactory wals = new WALFactory(CONF, null, method); + final WALFactory wals = new WALFactory(CONF, method); try { Path regiondir = region.getRegionFileSystem().getRegionDir(); FileSystem fs = region.getRegionFileSystem().getFileSystem(); @@ -850,7 +847,7 @@ public class TestHRegion { CONF.setClass(HConstants.REGION_IMPL, HRegionForTesting.class, Region.class); byte[] family = Bytes.toBytes("family"); this.region = initHRegion(tableName, method, CONF, family); - final WALFactory wals = new WALFactory(CONF, null, method); + final WALFactory wals = new WALFactory(CONF, method); try { Path regiondir = region.getRegionFileSystem().getRegionDir(); FileSystem fs = region.getRegionFileSystem().getFileSystem(); @@ -961,7 +958,7 @@ public class TestHRegion { Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log"); final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration()); FSUtils.setRootDir(walConf, logDir); - final WALFactory wals = new WALFactory(walConf, null, method); + final WALFactory wals = new WALFactory(walConf, method); final WAL wal = wals.getWAL(RegionInfoBuilder.newBuilder(tableName).build()); this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW, @@ -4694,7 +4691,7 @@ public class TestHRegion { // XXX: The spied AsyncFSWAL can not work properly because of a Mockito defect that can not // deal with classes which have a field of an inner class. See discussions in HBASE-15536. walConf.set(WALFactory.WAL_PROVIDER, "filesystem"); - final WALFactory wals = new WALFactory(walConf, null, UUID.randomUUID().toString()); + final WALFactory wals = new WALFactory(walConf, UUID.randomUUID().toString()); final WAL wal = spy(wals.getWAL(RegionInfoBuilder.newBuilder(tableName).build())); this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false, tableDurability, wal, @@ -4844,9 +4841,7 @@ public class TestHRegion { static WALFactory createWALFactory(Configuration conf, Path rootDir) throws IOException { Configuration confForWAL = new Configuration(conf); confForWAL.set(HConstants.HBASE_DIR, rootDir.toString()); - return new WALFactory(confForWAL, - Collections.singletonList(new MetricsWAL()), - "hregion-" + RandomStringUtils.randomNumeric(8)); + return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8)); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index 9d6aedb..013597a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -216,7 +216,7 @@ public class TestHStore { RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); Configuration walConf = new Configuration(conf); FSUtils.setRootDir(walConf, basedir); - WALFactory wals = new WALFactory(walConf, null, methodName); + WALFactory wals = new WALFactory(walConf, methodName); region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf, htd, null); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java index 1e3bdcd..5b0a60f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java @@ -117,7 +117,7 @@ public class TestStoreFileRefresherChore { new FailingHRegionFileSystem(conf, tableDir.getFileSystem(conf), tableDir, info); final Configuration walConf = new Configuration(conf); FSUtils.setRootDir(walConf, tableDir); - final WALFactory wals = new WALFactory(walConf, null, "log_" + replicaId); + final WALFactory wals = new WALFactory(walConf, "log_" + replicaId); ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); HRegion region = new HRegion(fs, wals.getWAL(info), diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java index cdc1572..c7a2a7c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java @@ -121,7 +121,7 @@ public class TestWALMonotonicallyIncreasingSeqId { final Configuration walConf = new Configuration(conf); FSUtils.setRootDir(walConf, tableDir); this.walConf = walConf; - wals = new WALFactory(walConf, null, "log_" + replicaId); + wals = new WALFactory(walConf, "log_" + replicaId); ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); HRegion region = HRegion.createHRegion(info, TEST_UTIL.getDefaultRootDirPath(), conf, htd, wals.getWAL(info)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java index 37c3b37..c6059b1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java @@ -159,11 +159,11 @@ public abstract class AbstractTestLogRolling { /** * Tests that log rolling doesn't hang when no data is written. */ - @Test(timeout = 120000) + @Test public void testLogRollOnNothingWritten() throws Exception { final Configuration conf = TEST_UTIL.getConfiguration(); final WALFactory wals = - new WALFactory(conf, null, ServerName.valueOf("test.com", 8080, 1).toString()); + new WALFactory(conf, ServerName.valueOf("test.com", 8080, 1).toString()); final WAL newLog = wals.getWAL(null); try { // Now roll the log before we write anything. @@ -183,8 +183,6 @@ public abstract class AbstractTestLogRolling { /** * Tests that logs are deleted - * @throws IOException - * @throws org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException */ @Test public void testLogRolling() throws Exception { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java index c0510d3..e49cda0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java @@ -26,7 +26,6 @@ import static org.junit.Assert.assertTrue; import java.io.Closeable; import java.io.IOException; - import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -71,7 +70,7 @@ public abstract class AbstractTestProtobufLog { public void setUp() throws Exception { fs = TEST_UTIL.getDFSCluster().getFileSystem(); dir = new Path(TEST_UTIL.createRootDir(), currentTest.getMethodName()); - wals = new WALFactory(TEST_UTIL.getConfiguration(), null, currentTest.getMethodName()); + wals = new WALFactory(TEST_UTIL.getConfiguration(), currentTest.getMethodName()); } @After diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index d18b75c..b1e304e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -22,9 +22,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -43,7 +43,6 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; @@ -155,14 +154,14 @@ public abstract class AbstractTestWALReplay { this.hbaseRootDir = FSUtils.getRootDir(this.conf); this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME); String serverName = - ServerName.valueOf(currentTest.getMethodName() + "-manual", 16010, - System.currentTimeMillis()).toString(); + ServerName.valueOf(currentTest.getMethodName() + "-manual", 16010, System.currentTimeMillis()) + .toString(); this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName); this.logDir = new Path(this.hbaseRootDir, logName); if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) { TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); } - this.wals = new WALFactory(conf, null, currentTest.getMethodName()); + this.wals = new WALFactory(conf, currentTest.getMethodName()); } @After diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java index b7aa0e3..f5fabbc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java @@ -123,7 +123,7 @@ public class TestDurability { @Test public void testDurability() throws Exception { - WALFactory wals = new WALFactory(CONF, null, + WALFactory wals = new WALFactory(CONF, ServerName.valueOf("TestDurability", 16010, System.currentTimeMillis()).toString()); HRegion region = createHRegion(wals, Durability.USE_DEFAULT); WAL wal = region.getWAL(); @@ -187,7 +187,7 @@ public class TestDurability { byte[] col3 = Bytes.toBytes("col3"); // Setting up region - WALFactory wals = new WALFactory(CONF, null, + WALFactory wals = new WALFactory(CONF, ServerName.valueOf("TestIncrement", 16010, System.currentTimeMillis()).toString()); HRegion region = createHRegion(wals, Durability.USE_DEFAULT); WAL wal = region.getWAL(); @@ -253,7 +253,7 @@ public class TestDurability { byte[] col1 = Bytes.toBytes("col1"); // Setting up region - WALFactory wals = new WALFactory(CONF, null, + WALFactory wals = new WALFactory(CONF, ServerName .valueOf("testIncrementWithReturnResultsSetToFalse", 16010, System.currentTimeMillis()) .toString()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java index e27fb97..3476aaf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java @@ -191,7 +191,7 @@ public class TestLogRollAbort { String logName = ServerName.valueOf("testLogRollAfterSplitStart", 16010, System.currentTimeMillis()).toString(); Path thisTestsDir = new Path(HBASELOGDIR, AbstractFSWALProvider.getWALDirectoryName(logName)); - final WALFactory wals = new WALFactory(conf, null, logName); + final WALFactory wals = new WALFactory(conf, logName); try { // put some entries in an WAL diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java index c83e4e7..5ee0dfa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java @@ -91,7 +91,7 @@ public class TestLogRollingNoCluster { conf.set(WALFactory.WAL_PROVIDER, "filesystem"); FSUtils.setRootDir(conf, dir); conf.set("hbase.regionserver.hlog.writer.impl", HighLatencySyncWriter.class.getName()); - final WALFactory wals = new WALFactory(conf, null, TestLogRollingNoCluster.class.getName()); + final WALFactory wals = new WALFactory(conf, TestLogRollingNoCluster.class.getName()); final WAL wal = wals.getWAL(null); Appender [] appenders = null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java index d7d3166..0967a75 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java @@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.junit.Assert.assertEquals; -import java.util.ArrayList; -import java.util.List; import java.util.NavigableMap; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; @@ -99,9 +97,8 @@ public class TestWALActionsListener { @Test public void testActionListener() throws Exception { DummyWALActionsListener observer = new DummyWALActionsListener(); - List list = new ArrayList<>(1); - list.add(observer); - final WALFactory wals = new WALFactory(conf, list, "testActionListener"); + final WALFactory wals = new WALFactory(conf, "testActionListener"); + wals.getWALProvider().addWALActionsListener(observer); DummyWALActionsListener laterobserver = new DummyWALActionsListener(); RegionInfo hri = RegionInfoBuilder.newBuilder(TableName.valueOf(SOME_BYTES)) .setStartKey(SOME_BYTES).setEndKey(SOME_BYTES).build(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java index fe773cb..4effe41 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java @@ -104,10 +104,10 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase { for (int i = 0; i < numRs; i++) { HRegionServer hrs = utility1.getHBaseCluster().getRegionServer(i); Replication replicationService = (Replication) hrs.getReplicationSourceService(); - replicationService.preLogRoll(null, emptyWalPaths.get(i)); - replicationService.postLogRoll(null, emptyWalPaths.get(i)); + replicationService.getReplicationManager().preLogRoll(emptyWalPaths.get(i)); + replicationService.getReplicationManager().postLogRoll(emptyWalPaths.get(i)); RegionInfo regionInfo = - utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); + utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); WAL wal = hrs.getWAL(regionInfo); wal.rollWriter(true); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index f4d4d71..f5d2a2d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; -import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -45,7 +44,6 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.client.replication.TableCFs; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; -import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; @@ -60,8 +58,6 @@ import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; - @Category({ ReplicationTests.class, LargeTests.class }) public class TestReplicationSmallTests extends TestReplicationBase { @@ -333,20 +329,6 @@ public class TestReplicationSmallTests extends TestReplicationBase { } /** - * Test for HBASE-9038, Replication.scopeWALEdits would NPE if it wasn't filtering out the - * compaction WALEdit. - */ - @Test - public void testCompactionWALEdits() throws Exception { - WALProtos.CompactionDescriptor compactionDescriptor = - WALProtos.CompactionDescriptor.getDefaultInstance(); - RegionInfo hri = RegionInfoBuilder.newBuilder(htable1.getName()) - .setStartKey(HConstants.EMPTY_START_ROW).setEndKey(HConstants.EMPTY_END_ROW).build(); - WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor); - Replication.scopeWALEdits(new WALKeyImpl(), edit, htable1.getConfiguration(), null); - } - - /** * Test for HBASE-8663 *

    * Create two new Tables with colfamilies enabled for replication then run diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 6bdbbd2..a8afe2d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -59,6 +59,8 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; @@ -80,6 +82,7 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; @@ -100,6 +103,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Sets; import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; /** @@ -179,8 +183,8 @@ public abstract class TestReplicationSourceManager { HConstants.HREGION_OLDLOGDIR_NAME); logDir = new Path(utility.getDataTestDir(), HConstants.HREGION_LOGDIR_NAME); - replication = new Replication(new DummyServer(), fs, logDir, oldLogDir); - + replication = new Replication(); + replication.initialize(new DummyServer(), fs, logDir, oldLogDir, null); managerOfCluster = getManagerFromCluster(); if (managerOfCluster != null) { // After replication procedure, we need to add peer by hand (other than by receiving @@ -269,10 +273,26 @@ public abstract class TestReplicationSourceManager { WALEdit edit = new WALEdit(); edit.add(kv); - List listeners = new ArrayList<>(1); - listeners.add(replication); - final WALFactory wals = new WALFactory(utility.getConfiguration(), listeners, - URLEncoder.encode("regionserver:60020", "UTF8")); + WALFactory wals = + new WALFactory(utility.getConfiguration(), URLEncoder.encode("regionserver:60020", "UTF8")); + ReplicationSourceManager replicationManager = replication.getReplicationManager(); + wals.getWALProvider().addWALActionsListener(new WALActionsListener() { + + @Override + public void preLogRoll(Path oldPath, Path newPath) throws IOException { + replicationManager.preLogRoll(newPath); + } + + @Override + public void postLogRoll(Path oldPath, Path newPath) throws IOException { + replicationManager.postLogRoll(newPath); + } + + @Override + public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException { + replicationManager.scopeWALEdits(logKey, logEdit); + } + }); final WAL wal = wals.getWAL(hri); manager.init(); HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("tableame")); @@ -418,6 +438,21 @@ public abstract class TestReplicationSourceManager { } } + /** + * Test for HBASE-9038, Replication.scopeWALEdits would NPE if it wasn't filtering out the + * compaction WALEdit. + */ + @Test + public void testCompactionWALEdits() throws Exception { + TableName tableName = TableName.valueOf("testCompactionWALEdits"); + WALProtos.CompactionDescriptor compactionDescriptor = + WALProtos.CompactionDescriptor.getDefaultInstance(); + RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(HConstants.EMPTY_START_ROW) + .setEndKey(HConstants.EMPTY_END_ROW).build(); + WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor); + ReplicationSourceManager.scopeWALEdits(new WALKeyImpl(), edit, conf); + } + @Test public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception { NavigableMap scope = new TreeMap<>(Bytes.BYTES_COMPARATOR); @@ -427,7 +462,7 @@ public abstract class TestReplicationSourceManager { WALKeyImpl logKey = new WALKeyImpl(scope); // 3. Get the scopes for the key - Replication.scopeWALEdits(logKey, logEdit, conf, manager); + ReplicationSourceManager.scopeWALEdits(logKey, logEdit, conf); // 4. Assert that no bulk load entry scopes are added if bulk load hfile replication is disabled assertNull("No bulk load entries scope should be added if bulk load replication is disabled.", @@ -446,7 +481,7 @@ public abstract class TestReplicationSourceManager { bulkLoadConf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); // 4. Get the scopes for the key - Replication.scopeWALEdits(logKey, logEdit, bulkLoadConf, manager); + ReplicationSourceManager.scopeWALEdits(logKey, logEdit, bulkLoadConf); NavigableMap scopes = logKey.getReplicationScopes(); // Assert family with replication scope global is present in the key scopes diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index d89c9c2..2146e47 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -26,8 +26,6 @@ import static org.junit.Assert.fail; import static org.mockito.Mockito.when; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import java.util.NavigableMap; import java.util.NoSuchElementException; import java.util.OptionalLong; @@ -119,10 +117,9 @@ public class TestWALEntryStream { @Before public void setUp() throws Exception { walQueue = new PriorityBlockingQueue<>(); - List listeners = new ArrayList(); pathWatcher = new PathWatcher(); - listeners.add(pathWatcher); - final WALFactory wals = new WALFactory(conf, listeners, tn.getMethodName()); + final WALFactory wals = new WALFactory(conf, tn.getMethodName()); + wals.getWALProvider().addWALActionsListener(pathWatcher); log = wals.getWAL(info); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java index 485e5b8..3928d9c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java @@ -25,6 +25,7 @@ import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DE import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -79,8 +80,15 @@ public class IOTestProvider implements WALProvider { none } - private FSHLog log = null; + private WALFactory factory; + private Configuration conf; + + private volatile FSHLog log; + + private String providerId; + + private List listeners = new ArrayList<>(); /** * @param factory factory that made us, identity used for FS layout. may not be null * @param conf may not be null @@ -89,41 +97,60 @@ public class IOTestProvider implements WALProvider { * null */ @Override - public void init(final WALFactory factory, final Configuration conf, - final List listeners, String providerId) throws IOException { - if (null != log) { + public void init(WALFactory factory, Configuration conf, String providerId) throws IOException { + if (factory != null) { throw new IllegalStateException("WALProvider.init should only be called once."); } - if (null == providerId) { - providerId = DEFAULT_PROVIDER_ID; - } - final String logPrefix = factory.factoryId + WAL_FILE_NAME_DELIMITER + providerId; - log = new IOTestWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf), - AbstractFSWALProvider.getWALDirectoryName(factory.factoryId), - HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, true, logPrefix, - META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null); + this.factory = factory; + this.conf = conf; + this.providerId = providerId != null ? providerId : DEFAULT_PROVIDER_ID; + + } @Override public List getWALs() { - List wals = new ArrayList<>(1); - wals.add(log); - return wals; + return Collections.singletonList(log); + } + + private FSHLog createWAL() throws IOException { + String logPrefix = factory.factoryId + WAL_FILE_NAME_DELIMITER + providerId; + return new IOTestWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf), + AbstractFSWALProvider.getWALDirectoryName(factory.factoryId), + HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, true, logPrefix, + META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null); } @Override public WAL getWAL(RegionInfo region) throws IOException { - return log; + FSHLog log = this.log; + if (log != null) { + return log; + } + synchronized (this) { + log = this.log; + if (log == null) { + log = createWAL(); + this.log = log; + } + } + return log; } @Override public void close() throws IOException { - log.close(); + FSHLog log = this.log; + if (log != null) { + log.close(); + } } @Override public void shutdown() throws IOException { - log.shutdown(); + FSHLog log = this.log; + if (log != null) { + log.shutdown(); + } } private static class IOTestWAL extends FSHLog { @@ -255,4 +282,10 @@ public class IOTestProvider implements WALProvider { public long getLogFileSize() { return this.log.getLogFileSize(); } + + @Override + public void addWALActionsListener(WALActionsListener listener) { + // TODO Implement WALProvider.addWALActionLister + + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingStrategy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingStrategy.java index 3a39ee9..b24daa1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingStrategy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingStrategy.java @@ -185,7 +185,7 @@ public class TestBoundedRegionGroupingStrategy { // Set HDFS root directory for storing WAL FSUtils.setRootDir(CONF, TEST_UTIL.getDataTestDirOnTestFS()); - wals = new WALFactory(CONF, null, "setMembershipDedups"); + wals = new WALFactory(CONF, "setMembershipDedups"); Set seen = new HashSet<>(temp * 4); int count = 0; // we know that this should see one of the wals more than once diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java index 5aea0cf..c3615a2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java @@ -200,7 +200,7 @@ public class TestFSHLogProvider { } Configuration localConf = new Configuration(conf); localConf.set(WALFactory.WAL_PROVIDER, FSHLogProvider.class.getName()); - WALFactory wals = new WALFactory(localConf, null, currentTest.getMethodName()); + WALFactory wals = new WALFactory(localConf, currentTest.getMethodName()); try { RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); RegionInfo hri2 = RegionInfoBuilder.newBuilder(htd2.getTableName()).build(); @@ -280,7 +280,7 @@ public class TestFSHLogProvider { } Configuration localConf = new Configuration(conf); localConf.set(WALFactory.WAL_PROVIDER, FSHLogProvider.class.getName()); - WALFactory wals = new WALFactory(localConf, null, currentTest.getMethodName()); + WALFactory wals = new WALFactory(localConf, currentTest.getMethodName()); try { WAL wal = wals.getWAL(null); assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(wal)); @@ -354,7 +354,7 @@ public class TestFSHLogProvider { public void setMembershipDedups() throws IOException { Configuration localConf = new Configuration(conf); localConf.set(WALFactory.WAL_PROVIDER, FSHLogProvider.class.getName()); - WALFactory wals = new WALFactory(localConf, null, currentTest.getMethodName()); + WALFactory wals = new WALFactory(localConf, currentTest.getMethodName()); try { final Set seen = new HashSet<>(1); assertTrue("first attempt to add WAL from default provider should work.", diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java index 814320a..8193806 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java @@ -119,7 +119,7 @@ public class TestSecureWAL { final byte[] value = Bytes.toBytes("Test value"); FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); final WALFactory wals = - new WALFactory(TEST_UTIL.getConfiguration(), null, tableName.getNameAsString()); + new WALFactory(TEST_UTIL.getConfiguration(), tableName.getNameAsString()); // Write the WAL final WAL wal = wals.getWAL(regionInfo); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java index 5679d96..a65d97c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java @@ -105,7 +105,7 @@ public class TestWALFactory { fs = cluster.getFileSystem(); dir = new Path(hbaseDir, currentTest.getMethodName()); this.currentServername = ServerName.valueOf(currentTest.getMethodName(), 16010, 1); - wals = new WALFactory(conf, null, this.currentServername.toString()); + wals = new WALFactory(conf, this.currentServername.toString()); } @After diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java index 9e88f6e..b20b3a5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java @@ -106,7 +106,7 @@ public class TestWALMethods { final Configuration walConf = new Configuration(util.getConfiguration()); FSUtils.setRootDir(walConf, regiondir); - (new WALFactory(walConf, null, "dummyLogName")).getWAL(null); + (new WALFactory(walConf, "dummyLogName")).getWAL(null); NavigableSet files = WALSplitter.getSplitEditFilesSorted(fs, regiondir); assertEquals(7, files.size()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java index 3cbd37e..bc21a65 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java @@ -152,7 +152,7 @@ public class TestWALReaderOnSecureWAL { WALProvider.AsyncWriter.class); conf.setBoolean(WAL_ENCRYPTION, true); FileSystem fs = TEST_UTIL.getTestFileSystem(); - final WALFactory wals = new WALFactory(conf, null, currentTest.getMethodName()); + final WALFactory wals = new WALFactory(conf, currentTest.getMethodName()); Path walPath = writeWAL(wals, currentTest.getMethodName(), offheap); // Insure edits are not plaintext @@ -195,9 +195,8 @@ public class TestWALReaderOnSecureWAL { WALProvider.Writer.class); conf.setBoolean(WAL_ENCRYPTION, false); FileSystem fs = TEST_UTIL.getTestFileSystem(); - final WALFactory wals = new WALFactory(conf, null, - ServerName.valueOf(currentTest.getMethodName(), 16010, - System.currentTimeMillis()).toString()); + final WALFactory wals = new WALFactory(conf, ServerName + .valueOf(currentTest.getMethodName(), 16010, System.currentTimeMillis()).toString()); Path walPath = writeWAL(wals, currentTest.getMethodName(), false); // Ensure edits are plaintext diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALRootDir.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALRootDir.java index 412acb6..40fad6a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALRootDir.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALRootDir.java @@ -90,7 +90,7 @@ public class TestWALRootDir { @Test public void testWALRootDir() throws Exception { RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build(); - wals = new WALFactory(conf, null, "testWALRootDir"); + wals = new WALFactory(conf, "testWALRootDir"); WAL log = wals.getWAL(regionInfo); assertEquals(1, getWALFiles(walFs, walRootDir).size()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java index 9b98859..011c9ed 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java @@ -194,7 +194,7 @@ public class TestWALSplit { REGIONS.clear(); Collections.addAll(REGIONS, "bbb", "ccc"); InstrumentedLogWriter.activateFailure = false; - wals = new WALFactory(conf, null, name.getMethodName()); + wals = new WALFactory(conf, name.getMethodName()); WALDIR = new Path(HBASELOGDIR, AbstractFSWALProvider.getWALDirectoryName(ServerName.valueOf(name.getMethodName(), 16010, System.currentTimeMillis()).toString())); @@ -629,7 +629,7 @@ public class TestWALSplit { LOG.debug("no previous CORRUPTDIR to clean."); } // change to the faulty reader - wals = new WALFactory(conf, null, name.getMethodName()); + wals = new WALFactory(conf, name.getMethodName()); generateWALs(-1); // Our reader will render all of these files corrupt. final Set walDirContents = new HashSet<>(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java index a9bad00..374c83d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java @@ -321,7 +321,7 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { rootRegionDir = rootRegionDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); cleanRegionRootDir(fs, rootRegionDir); FSUtils.setRootDir(getConf(), rootRegionDir); - final WALFactory wals = new WALFactory(getConf(), null, "wals"); + final WALFactory wals = new WALFactory(getConf(), "wals"); final HRegion[] regions = new HRegion[numRegions]; final Runnable[] benchmarks = new Runnable[numRegions]; final MockRegionServerServices mockServices = new MockRegionServerServices(getConf()); -- 2.7.4