From 56d77c586c08d783122259bb7ff832bcd592964d Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Tue, 26 Sep 2017 22:27:58 -0700 Subject: [PATCH] HBASE-18846 Accommodate the hbase-indexer/lily/SEP consumer deploy-type Patch to start a standalone RegionServer that register's itself and optionally stands up Services. Can work w/o a Master in the mix. Useful testing. Also can be used by hbase-indexer to put up a Replication sink that extends public-facing APIs w/o need to extend internals. See JIRA release note for detail. This patch adds booleans for whether to start Admin and Client Service. Other refactoring moves all thread and service start into the one fat location so we can ask to by-pass 'services' if we don't need them. See JIRA for an example hbase-server.xml that has config to shutdown WAL, cache, etc. Adds checks if a service/thread has been setup before going to use it. See JIRA too for example Connection implementation that makes use of Connection plugin point to receive a replication stream. The default replication sink catches the incoming replication stream, undoes the WALEdits and then creates a Table to call a batch with the edits; up on JIRA, an example Connection plugin (legit, supported) returns a Table with an overridden batch method where in we do index inserts returning appropriate results to keep the replication engine ticking over. Upsides: an unadulterated RegionServer that will keep replication metrics and even hosts a web UI if wanted. No hacks. Just ordained configs shutting down unused services. Injection of the indexing function at a blessed point with no pollution by hbase internals; only public imports. No user of Private nor LimitedPrivate classes. --- conf/hbase-site.xml | 90 ++++ conf/log4j.properties | 3 +- .../hadoop/hbase/client/IndexerConnection.java | 358 +++++++++++++++ .../hadoop/hbase/regionserver/HRegionServer.java | 492 +++++++++++---------- .../hadoop/hbase/regionserver/RSRpcServices.java | 60 ++- .../regionserver/TestStandaloneRegionServer.java | 63 +++ 6 files changed, 826 insertions(+), 240 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/IndexerConnection.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStandaloneRegionServer.java diff --git a/conf/hbase-site.xml b/conf/hbase-site.xml index c516ac7291..49554623e8 100644 --- a/conf/hbase-site.xml +++ b/conf/hbase-site.xml @@ -21,4 +21,94 @@ */ --> + + + + hbase.cluster.distributed + true + + + + + hbase.client.connection.impl + org.apache.hadoop.hbase.client.IndexerConnection + A customs connection implementation just so we can interject our + own Table class, one that has an override for the batch call which receives + the replication stream edits; i.e. it is called by the replication sink + #replicateEntries method. + + + + + + + hbase.regionserver.admin.service + false + Do NOT stand up an Admin Service Interface on RPC + + + hbase.regionserver.client.service + false + Do NOT stand up a client-facing Service on RPC + + + hbase.wal.provider + org.apache.hadoop.hbase.wal.DisabledWALProvider + Set WAL service to be the null WAL + + + hbase.regionserver.workers + false + Turn off all background workers, log splitters, executors, etc. + + + hfile.block.cache.size + 0.0001 + Turn off block cache completely + + + hbase.mob.file.cache.size + 0 + Disable MOB cache. + + + hbase.masterless + true + Do not expect Master in cluster. + + + hbase.regionserver.metahandler.count + 1 + How many priority handlers to run; we probably need none. + Default is 20 which is too much on a server like this. + + + hbase.regionserver.replication.handler.count + 1 + How many replication handlers to run; we probably need none. + Default is 3 which is too much on a server like this. + + + hbase.regionserver.handler.count + 10 + How many default handlers to run; tie to # of CPUs. + Default is 30 which is too much on a server like this. + + + hbase.ipc.server.read.threadpool.size + 3 + How many Listener request reaaders to run; tie to a portion # of CPUs (1/4?). + Default is 10 which is too much on a server like this. + diff --git a/conf/log4j.properties b/conf/log4j.properties index 15545fff80..eee314470a 100644 --- a/conf/log4j.properties +++ b/conf/log4j.properties @@ -94,7 +94,8 @@ log4j.appender.asyncconsole.target=System.err log4j.logger.org.apache.zookeeper=INFO #log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG -log4j.logger.org.apache.hadoop.hbase=INFO +#log4j.logger.org.apache.hadoop.hbase=INFO +log4j.logger.org.apache.hadoop.hbase=DEBUG log4j.logger.org.apache.hadoop.hbase.META=INFO # Make these two classes INFO-level. Make them DEBUG to see more zk debug. log4j.logger.org.apache.hadoop.hbase.zookeeper.ZKUtil=INFO diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/IndexerConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/IndexerConnection.java new file mode 100644 index 0000000000..133cb0b20a --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/IndexerConnection.java @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.Service; +import com.google.protobuf.ServiceException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CompareOperator; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.security.User; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; + + +/** + * Sample class for hbase-indexer. + * Overrides Connection just so we can return a Table that has the + * method that the replication sink calls, i.e. Table#batch. + * It is at this point that the hbase-indexer catches the replication + * stream so it can insert into the lucene index. + */ +public class IndexerConnection implements Connection { + private final Configuration conf; + private final User user; + private final ExecutorService pool; + private volatile boolean closed = false; + + public IndexerConnection(Configuration conf, ExecutorService pool, User user) throws IOException { + this.conf = conf; + this.user = user; + this.pool = pool; + } + + @Override + public void abort(String why, Throwable e) {} + + @Override + public boolean isAborted() { + return false; + } + + @Override + public Configuration getConfiguration() { + return this.conf; + } + + @Override + public BufferedMutator getBufferedMutator(TableName tableName) throws IOException { + return null; + } + + @Override + public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException { + return null; + } + + @Override + public RegionLocator getRegionLocator(TableName tableName) throws IOException { + return null; + } + + @Override + public Admin getAdmin() throws IOException { + return null; + } + + @Override + public void close() throws IOException { + if (!this.closed) this.closed = true; + } + + @Override + public boolean isClosed() { + return this.closed; + } + + @Override + public TableBuilder getTableBuilder(final TableName tn, ExecutorService pool) { + if (isClosed()) { + throw new RuntimeException("IndexerConnection is closed."); + } + final Configuration passedInConfiguration = getConfiguration(); + return new TableBuilder() { + @Override + public TableBuilder setOperationTimeout(int timeout) { + return null; + } + + @Override + public TableBuilder setRpcTimeout(int timeout) { + return null; + } + + @Override + public TableBuilder setReadRpcTimeout(int timeout) { + return null; + } + + @Override + public TableBuilder setWriteRpcTimeout(int timeout) { + return null; + } + + @Override + public Table build() { + return new Table() { + private final Configuration conf = passedInConfiguration; + private final TableName tableName = tn; + + @Override + public TableName getName() { + return this.tableName; + } + + @Override + public Configuration getConfiguration() { + return this.conf; + } + + @Override + public void batch(List actions, Object[] results) + throws IOException, InterruptedException { + // Implementation goes here. + } + + @Override + public HTableDescriptor getTableDescriptor() throws IOException { + return null; + } + + @Override + public TableDescriptor getDescriptor() throws IOException { + return null; + } + + @Override + public boolean exists(Get get) throws IOException { + return false; + } + + @Override + public boolean[] existsAll(List gets) throws IOException { + return new boolean[0]; + } + + @Override + public void batchCallback(List actions, Object[] results, Batch.Callback callback) throws IOException, InterruptedException { + + } + + @Override + public Result get(Get get) throws IOException { + return null; + } + + @Override + public Result[] get(List gets) throws IOException { + return new Result[0]; + } + + @Override + public ResultScanner getScanner(Scan scan) throws IOException { + return null; + } + + @Override + public ResultScanner getScanner(byte[] family) throws IOException { + return null; + } + + @Override + public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException { + return null; + } + + @Override + public void put(Put put) throws IOException { + + } + + @Override + public void put(List puts) throws IOException { + + } + + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) throws IOException { + return false; + } + + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Put put) throws IOException { + return false; + } + + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, byte[] value, Put put) throws IOException { + return false; + } + + @Override + public void delete(Delete delete) throws IOException { + + } + + @Override + public void delete(List deletes) throws IOException { + + } + + @Override + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, Delete delete) throws IOException { + return false; + } + + @Override + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Delete delete) throws IOException { + return false; + } + + @Override + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, byte[] value, Delete delete) throws IOException { + return false; + } + + @Override + public void mutateRow(RowMutations rm) throws IOException { + + } + + @Override + public Result append(Append append) throws IOException { + return null; + } + + @Override + public Result increment(Increment increment) throws IOException { + return null; + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException { + return 0; + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability) throws IOException { + return 0; + } + + @Override + public void close() throws IOException { + + } + + @Override + public CoprocessorRpcChannel coprocessorService(byte[] row) { + return null; + } + + @Override + public Map coprocessorService(Class service, byte[] startKey, byte[] endKey, Batch.Call callable) throws ServiceException, Throwable { + return null; + } + + @Override + public void coprocessorService(Class service, byte[] startKey, byte[] endKey, Batch.Call callable, Batch.Callback callback) throws ServiceException, Throwable { + + } + + @Override + public Map batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable { + return null; + } + + @Override + public void batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey, R responsePrototype, Batch.Callback callback) throws ServiceException, Throwable { + + } + + @Override + public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException { + return false; + } + + @Override + public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, byte[] value, RowMutations mutation) throws IOException { + return false; + } + + @Override + public void setOperationTimeout(int operationTimeout) { + + } + + @Override + public int getOperationTimeout() { + return 0; + } + + @Override + public int getRpcTimeout() { + return 0; + } + + @Override + public void setRpcTimeout(int rpcTimeout) { + + } + + @Override + public int getReadRpcTimeout() { + return 0; + } + + @Override + public void setReadRpcTimeout(int readRpcTimeout) { + + } + + @Override + public int getWriteRpcTimeout() { + return 0; + } + + @Override + public void setWriteRpcTimeout(int writeRpcTimeout) { + + } + }; + } + }; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 60f4e14e79..5eb56170b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1,5 +1,4 @@ -/** - * +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -233,16 +232,10 @@ public class HRegionServer extends HasThread implements /** * For testing only! Set to true to skip notifying region assignment to master . */ + @VisibleForTesting @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL") public static boolean TEST_SKIP_REPORTING_TRANSITION = false; - /* - * Strings to be used in forming the exception message for - * RegionsAlreadyInTransitionException. - */ - protected static final String OPEN = "OPEN"; - protected static final String CLOSE = "CLOSE"; - //RegionName vs current action in progress //true - if open region action in progress //false - if close region action in progress @@ -379,7 +372,7 @@ public class HRegionServer extends HasThread implements /** * ChoreService used to schedule tasks that we want to run periodically */ - private final ChoreService choreService; + private ChoreService choreService; /* * Check for compactions requests. @@ -395,7 +388,7 @@ public class HRegionServer extends HasThread implements // WAL roller. log is protected rather than private to avoid // eclipse warning when accessed by inner classes - protected final LogRoller walRoller; + protected LogRoller walRoller; // flag set after we're done setting up server threads final AtomicBoolean online = new AtomicBoolean(false); @@ -532,6 +525,15 @@ public class HRegionServer extends HasThread implements private final NettyEventLoopGroupConfig eventLoopGroupConfig; /** + * True if this RegionServer is coming up in a cluster where there is no Master; + * means it needs to just come up and make do without a Master to talk to: e.g. in test or + * HRegionServer is doing other than its usual duties: e.g. as an emasculated host whose only + * purpose is as a Replication-stream sink; see HBASE-18846 for more. + */ + private final boolean masterless; + static final String MASTERLESS_CONFIG_NAME = "hbase.masterless"; + + /** * Starts a HRegionServer at the default location. */ public HRegionServer(Configuration conf) throws IOException, InterruptedException { @@ -540,26 +542,23 @@ public class HRegionServer extends HasThread implements /** * Starts a HRegionServer at the default location + * * @param csm implementation of CoordinatedStateManager to be used */ + // Don't start any services or managers in here in the Consructor. + // Defer till after we register with the Master as much as possible. See #startServices. public HRegionServer(Configuration conf, CoordinatedStateManager csm) throws IOException { super("RegionServer"); // thread name this.startcode = System.currentTimeMillis(); - this.fsOk = true; this.conf = conf; - // initialize netty event loop group at the very beginning as we may use it to start rpc server, - // rpc client and WAL. - this.eventLoopGroupConfig = new NettyEventLoopGroupConfig(conf, "RS-EventLoopGroup"); - NettyRpcClientConfigHelper.setEventLoopConfig(conf, eventLoopGroupConfig.group(), - eventLoopGroupConfig.clientChannelClass()); - NettyAsyncFSWALConfigHelper.setEventLoopConfig(conf, eventLoopGroupConfig.group(), - eventLoopGroupConfig.clientChannelClass()); + this.fsOk = true; + this.masterless = conf.getBoolean(MASTERLESS_CONFIG_NAME, false); + this.eventLoopGroupConfig = setupNetty(this.conf); MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf); HFile.checkHFileVersion(this.conf); checkCodecs(this.conf); this.userProvider = UserProvider.instantiate(conf); FSUtils.setupShortCircuitRead(this.conf); - Replication.decorateRegionServerConfiguration(this.conf); // Disable usage of meta replicas in the regionserver @@ -575,15 +574,12 @@ public class HRegionServer extends HasThread implements boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true); this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null; - this.numRegionsToReport = conf.getInt( - "hbase.regionserver.numregionstoreport", 10); + this.numRegionsToReport = conf.getInt("hbase.regionserver.numregionstoreport", 10); - this.operationTimeout = conf.getInt( - HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); - this.shortOperationTimeout = conf.getInt( - HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, + this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT); this.abortRequested = false; @@ -605,9 +601,9 @@ public class HRegionServer extends HasThread implements } } } - String hostName = shouldUseThisHostnameInstead() ? useThisHostnameInstead : - rpcServices.isa.getHostName(); - serverName = ServerName.valueOf(hostName, rpcServices.isa.getPort(), startcode); + String hostName = shouldUseThisHostnameInstead()? + this.useThisHostnameInstead: this.rpcServices.isa.getHostName(); + serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startcode); rpcControllerFactory = RpcControllerFactory.instantiate(this.conf); rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf); @@ -632,8 +628,6 @@ public class HRegionServer extends HasThread implements }; initializeFileSystem(); - - service = new ExecutorService(getServerName().toShortString()); spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration()); // Some unit tests don't need a cluster, so no zookeeper at all @@ -642,43 +636,45 @@ public class HRegionServer extends HasThread implements zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" + rpcServices.isa.getPort(), this, canCreateBaseZNode()); - this.csm = (BaseCoordinatedStateManager) csm; - this.csm.initialize(this); - this.csm.start(); + // If no master in cluster, skip trying to track one or look for a cluster status. + if (!this.masterless) { + this.csm = (BaseCoordinatedStateManager) csm; + this.csm.initialize(this); + this.csm.start(); - masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this); - masterAddressTracker.start(); + masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this); + masterAddressTracker.start(); - clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this); - clusterStatusTracker.start(); + clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this); + clusterStatusTracker.start(); + } } this.configurationManager = new ConfigurationManager(); + setupWindows(getConfiguration(), getConfigurationManager()); + } - rpcServices.start(); - putUpWebUI(); - this.walRoller = new LogRoller(this, this); - this.choreService = new ChoreService(getServerName().toString(), true); - this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf); - + /** + * If running on Windows, do windows-specific setup. + */ + private static void setupWindows(final Configuration conf, ConfigurationManager cm) { if (!SystemUtils.IS_OS_WINDOWS) { Signal.handle(new Signal("HUP"), new SignalHandler() { @Override public void handle(Signal signal) { - getConfiguration().reloadConfiguration(); - configurationManager.notifyAllObservers(getConfiguration()); + conf.reloadConfiguration(); + cm.notifyAllObservers(conf); } }); } - // Create the CompactedFileDischarger chore service. This chore helps to - // remove the compacted files - // that will no longer be used in reads. - // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to - // 2 mins so that compacted files can be archived before the TTLCleaner runs - int cleanerInterval = - conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000); - this.compactedFileDischarger = - new CompactedHFilesDischarger(cleanerInterval, this, this); - choreService.scheduleChore(compactedFileDischarger); + } + + private static NettyEventLoopGroupConfig setupNetty(Configuration conf) { + // Initialize netty event loop group at start as we may use it for rpc server, rpc client & WAL. + NettyEventLoopGroupConfig nelgc = + new NettyEventLoopGroupConfig(conf, "RS-EventLoopGroup"); + NettyRpcClientConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass()); + NettyAsyncFSWALConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass()); + return nelgc; } private void initializeFileSystem() throws IOException { @@ -722,7 +718,7 @@ public class HRegionServer extends HasThread implements "hbase.regionserver.kerberos.principal", host); } - protected void waitForMasterActive(){ + protected void waitForMasterActive() { } protected String getProcessName() { @@ -730,7 +726,7 @@ public class HRegionServer extends HasThread implements } protected boolean canCreateBaseZNode() { - return false; + return this.masterless; } protected boolean canUpdateTableDescriptor() { @@ -773,8 +769,8 @@ public class HRegionServer extends HasThread implements /** * Create a 'smarter' Connection, one that is capable of by-passing RPC if the request is to - * the local server. Safe to use going to local or remote server. - * Create this instance in a method can be intercepted and mocked in tests. + * the local server; i.e. a short-circuit Connection. Safe to use going to local or remote + * server. Create this instance in a method can be intercepted and mocked in tests. * @throws IOException */ @VisibleForTesting @@ -820,28 +816,20 @@ public class HRegionServer extends HasThread implements /** * All initialization needed before we go register with Master. + * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master. + * In here we just put up the RpcServer, setup Connection, and ZooKeeper. * * @throws IOException * @throws InterruptedException */ - private void preRegistrationInitialization(){ + private void preRegistrationInitialization() { try { setupClusterConnection(); - - this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, clusterConnection); - this.secureBulkLoadManager.start(); - - // Health checker thread. - if (isHealthCheckerConfigured()) { - int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ, - HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); - healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration()); - } - initializeZooKeeper(); - if (!isStopped() && !isAborted()) { - initializeThreads(); - } + this.rpcServices.start(); + // Setup RPC client for master communication + this.rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress( + this.rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics()); } catch (Throwable t) { // Call stop if error or process will stick around for ever since server // puts up non-daemon threads. @@ -861,6 +849,9 @@ public class HRegionServer extends HasThread implements @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE", justification="cluster Id znode read would give us correct response") private void initializeZooKeeper() throws IOException, InterruptedException { + // Nothing to do in here if no Master in the mix. + if (this.masterless) return; + // Create the master address tracker, register with zk, and start it. Then // block until a master is available. No point in starting up if no master // running. @@ -907,7 +898,7 @@ public class HRegionServer extends HasThread implements } @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RV_RETURN_VALUE_IGNORED", - justification="We don't care about the return") + justification="We don't care about the return") private void doLatch(final CountDownLatch latch) throws InterruptedException { if (latch != null) { // Result is ignored intentionally but if I remove the below, findbugs complains (the @@ -933,67 +924,11 @@ public class HRegionServer extends HasThread implements } /** - * @return False if cluster shutdown in progress + * @return True if the cluster is up. */ private boolean isClusterUp() { - return clusterStatusTracker != null && clusterStatusTracker.isClusterUp(); - } - - private void initializeThreads() throws IOException { - // Cache flushing thread. - this.cacheFlusher = new MemStoreFlusher(conf, this); - - // Compaction thread - this.compactSplitThread = new CompactSplit(this); - - // Background thread to check for compactions; needed if region has not gotten updates - // in a while. It will take care of not checking too frequently on store-by-store basis. - this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this); - this.periodicFlusher = new PeriodicMemStoreFlusher(this.threadWakeFrequency, this); - this.leases = new Leases(this.threadWakeFrequency); - - // Create the thread to clean the moved regions list - movedRegionsCleaner = MovedRegionsCleaner.create(this); - - if (this.nonceManager != null) { - // Create the scheduled chore that cleans up nonces. - nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this); - } - - // Setup the Quota Manager - rsQuotaManager = new RegionServerRpcQuotaManager(this); - rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this); - - if (QuotaUtil.isQuotaEnabled(conf)) { - this.fsUtilizationChore = new FileSystemUtilizationChore(this); - } - - // Setup RPC client for master communication - rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress( - rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics()); - - boolean onlyMetaRefresh = false; - int storefileRefreshPeriod = conf.getInt( - StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD - , StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); - if (storefileRefreshPeriod == 0) { - storefileRefreshPeriod = conf.getInt( - StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD, - StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); - onlyMetaRefresh = true; - } - if (storefileRefreshPeriod > 0) { - this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod, - onlyMetaRefresh, this, this); - } - registerConfigurationObservers(); - } - - private void registerConfigurationObservers() { - // Registering the compactSplitThread object with the ConfigurationManager. - configurationManager.registerObserver(this.compactSplitThread); - configurationManager.registerObserver(this.rpcServices); - configurationManager.registerObserver(this); + return this.masterless || + this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp(); } /** @@ -1018,6 +953,7 @@ public class HRegionServer extends HasThread implements // Try and register with the Master; tell it we are here. Break if // server is stopped or the clusterup flag is down or hdfs went wacky. + // Once registered successfully, go ahead and start up all Services. while (keepLooping()) { RegionServerStartupResponse w = reportForDuty(); if (w == null) { @@ -1029,14 +965,19 @@ public class HRegionServer extends HasThread implements } } - if (!isStopped() && isHealthy()){ + if (!isStopped() && isHealthy()) { // start the snapshot handler and other procedure handlers, // since the server is ready to run - rspmHost.start(); - + if (this.rspmHost != null) { + this.rspmHost.start(); + } // Start the Quota Manager - rsQuotaManager.start(getRpcServer().getScheduler()); - rsSpaceQuotaManager.start(); + if (this.rsQuotaManager != null) { + rsQuotaManager.start(getRpcServer().getScheduler()); + } + if (this.rsSpaceQuotaManager != null) { + this.rsSpaceQuotaManager.start(); + } } // We registered with the Master. Go into run mode. @@ -1175,8 +1116,7 @@ public class HRegionServer extends HasThread implements if (!this.killed && this.fsOk) { waitOnAllRegionsToClose(abortRequested); - LOG.info("stopping server " + this.serverName + - "; all regions closed."); + LOG.info("stopping server " + this.serverName + "; all regions closed."); } //fsOk flag may be changed when closing regions throws exception. @@ -1222,8 +1162,7 @@ public class HRegionServer extends HasThread implements if (this.zooKeeper != null) { this.zooKeeper.close(); } - LOG.info("stopping server " + this.serverName + - "; zookeeper connection closed."); + LOG.info("stopping server " + this.serverName + "; zookeeper connection closed."); LOG.info(Thread.currentThread().getName() + " exiting"); } @@ -1266,8 +1205,7 @@ public class HRegionServer extends HasThread implements ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime); try { RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder(); - ServerName sn = ServerName.parseVersionedServerName( - this.serverName.getVersionedBytes()); + ServerName sn = ServerName.parseVersionedServerName(this.serverName.getVersionedBytes()); request.setServer(ProtobufUtil.toServerName(sn)); request.setLoad(sl); rss.regionServerReport(null, request.build()); @@ -1378,8 +1316,7 @@ public class HRegionServer extends HasThread implements maxMemory = usage.getMax(); } - ClusterStatusProtos.ServerLoad.Builder serverLoad = - ClusterStatusProtos.ServerLoad.newBuilder(); + ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder(); serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond()); serverLoad.setTotalNumberOfRequests((int) regionServerWrapper.getTotalRequestCount()); serverLoad.setUsedHeapMB((int)(usedMemory / 1024 / 1024)); @@ -1471,12 +1408,12 @@ public class HRegionServer extends HasThread implements // iterator of onlineRegions to close all user regions. for (Map.Entry e : this.onlineRegions.entrySet()) { RegionInfo hri = e.getValue().getRegionInfo(); - if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes()) - && !closedRegions.contains(hri.getEncodedName())) { + if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes()) && + !closedRegions.contains(hri.getEncodedName())) { closedRegions.add(hri.getEncodedName()); // Don't update zk with this close transition; pass false. closeRegionIgnoreErrors(hri, abort); - } + } } // No regions in RIT, we could stop waiting now. if (this.regionsInTransitionInRS.isEmpty()) { @@ -1538,8 +1475,8 @@ public class HRegionServer extends HasThread implements // The hostname the master sees us as. if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) { String hostnameFromMasterPOV = e.getValue(); - this.serverName = ServerName.valueOf(hostnameFromMasterPOV, - rpcServices.isa.getPort(), this.startcode); + this.serverName = ServerName.valueOf(hostnameFromMasterPOV, rpcServices.isa.getPort(), + this.startcode); if (shouldUseThisHostnameInstead() && !hostnameFromMasterPOV.equals(useThisHostnameInstead)) { String msg = "Master passed us a different hostname to use; was=" + @@ -1579,8 +1516,7 @@ public class HRegionServer extends HasThread implements // hack! Maps DFSClient => RegionServer for logs. HDFS made this // config param for task trackers, but we can piggyback off of it. if (this.conf.get("mapreduce.task.attempt.id") == null) { - this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + - this.serverName.toString()); + this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName.toString()); } // Save it in a file, this will allow to see if we crash @@ -1594,14 +1530,16 @@ public class HRegionServer extends HasThread implements this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource()); pauseMonitor.start(); - startServiceThreads(); - startHeapMemoryManager(); - // Call it after starting HeapMemoryManager. - initializeMemStoreChunkCreator(); - LOG.info("Serving as " + this.serverName + - ", RpcServer on " + rpcServices.isa + - ", sessionid=0x" + - Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId())); + // There is a rare case where we do NOT want services to start. Check config. + if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) { + startServices(); + } + startReplicationService(); + + // Set up ZK + LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.isa + + ", sessionid=0x" + + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId())); // Wake up anyone waiting for this server to online synchronized (online) { @@ -1626,15 +1564,15 @@ public class HRegionServer extends HasThread implements long globalMemStoreSize = pair.getFirst(); boolean offheap = this.regionServerAccounting.isOffheap(); // When off heap memstore in use, take full area for chunk pool. - float poolSizePercentage = offheap ? 1.0F - : conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, MemStoreLAB.POOL_MAX_SIZE_DEFAULT); + float poolSizePercentage = offheap? 1.0F: + conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, MemStoreLAB.POOL_MAX_SIZE_DEFAULT); float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT); int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT); // init the chunkCreator ChunkCreator chunkCreator = ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage, - initialCountPercentage, this.hMemManager); + initialCountPercentage, this.hMemManager); } } @@ -1651,8 +1589,7 @@ public class HRegionServer extends HasThread implements rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1); rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo()); byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray()); - ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, - getMyEphemeralNodePath(), data); + ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data); } private void deleteMyEphemeralNode() throws KeeperException { @@ -1755,8 +1692,7 @@ public class HRegionServer extends HasThread implements // immediately upon region server startup private long iteration = 1; - CompactionChecker(final HRegionServer h, final int sleepTime, - final Stoppable stopper) { + CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) { super("CompactionChecker", stopper, sleepTime); this.instance = h; LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime)); @@ -1765,8 +1701,8 @@ public class HRegionServer extends HasThread implements * If not set, the compaction will use default priority. */ this.majorCompactPriority = this.instance.conf. - getInt("hbase.regionserver.compactionChecker.majorCompactPriority", - DEFAULT_PRIORITY); + getInt("hbase.regionserver.compactionChecker.majorCompactPriority", + DEFAULT_PRIORITY); } @Override @@ -1792,12 +1728,13 @@ public class HRegionServer extends HasThread implements if (majorCompactPriority == DEFAULT_PRIORITY || majorCompactPriority > hr.getCompactPriority()) { this.instance.compactSplitThread.requestCompaction(hr, s, - getName() + " requests major compaction; use default priority", Store.NO_PRIORITY, - CompactionLifeCycleTracker.DUMMY, null); + getName() + " requests major compaction; use default priority", + Store.NO_PRIORITY, + CompactionLifeCycleTracker.DUMMY, null); } else { this.instance.compactSplitThread.requestCompaction(hr, s, - getName() + " requests major compaction; use configured priority", - this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null); + getName() + " requests major compaction; use configured priority", + this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null); } } } catch (IOException e) { @@ -1855,6 +1792,7 @@ public class HRegionServer extends HasThread implements /** * Setup WAL log and replication if enabled. * Replication setup is done in here because it wants to be hooked up to WAL. + * * @throws IOException */ private void setupWALAndReplication() throws IOException { @@ -1866,11 +1804,10 @@ public class HRegionServer extends HasThread implements if (LOG.isDebugEnabled()) LOG.debug("logDir=" + logDir); if (this.walFs.exists(logDir)) { throw new RegionServerRunningException("Region server has already " + - "created directory at " + this.serverName.toString()); + "created directory at " + this.serverName.toString()); } - // Instantiate replication manager if replication enabled. Pass it the - // log directories. + // Instantiate replication if replication enabled. Pass it the log directories. createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir); // listeners the wal factory will add to wals it creates. @@ -1899,6 +1836,25 @@ public class HRegionServer extends HasThread implements } } + /** + * Start up replication source and sink handlers. + * @throws IOException + */ + private void startReplicationService() throws IOException { + if (this.replicationSourceHandler == this.replicationSinkHandler && + this.replicationSourceHandler != null) { + this.replicationSourceHandler.startReplicationService(); + } else { + if (this.replicationSourceHandler != null) { + this.replicationSourceHandler.startReplicationService(); + } + if (this.replicationSinkHandler != null) { + this.replicationSinkHandler.startReplicationService(); + } + } + } + + public MetricsRegionServer getRegionServerMetrics() { return this.metricsRegionServer; } @@ -1912,6 +1868,8 @@ public class HRegionServer extends HasThread implements /* * Start maintenance Threads, Server, Worker and lease checker threads. + * Start all threads we need to run. This is called after we've successfully + * registered with the Master. * Install an UncaughtExceptionHandler that calls abort of RegionServer if we * get an unhandled exception. We cannot set the handler on all threads. * Server's internal Listener thread is off limits. For Server, if an OOME, it @@ -1922,35 +1880,67 @@ public class HRegionServer extends HasThread implements * keeps its own internal stop mechanism so needs to be stopped by this * hosting server. Worker logs the exception and exits. */ - private void startServiceThreads() throws IOException { + private void startServices() throws IOException { + // initializeThreads needs chore service to be up. + this.choreService = new ChoreService(getServerName().toString(), true); + if (!isStopped() && !isAborted()) { + initializeThreads(); + } + this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, clusterConnection); + this.secureBulkLoadManager.start(); + + putUpWebUI(); + + // Health checker thread. + if (isHealthCheckerConfigured()) { + int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ, + HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); + healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration()); + } + + this.walRoller = new LogRoller(this, this); + this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf); + + // Create the CompactedFileDischarger chore service. This chore helps to + // remove the compacted files + // that will no longer be used in reads. + // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to + // 2 mins so that compacted files can be archived before the TTLCleaner runs + int cleanerInterval = + conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000); + this.compactedFileDischarger = + new CompactedHFilesDischarger(cleanerInterval, this, this); + choreService.scheduleChore(compactedFileDischarger); + // Start executor services + this.service = new ExecutorService(getServerName().toShortString()); this.service.startExecutorService(ExecutorType.RS_OPEN_REGION, - conf.getInt("hbase.regionserver.executor.openregion.threads", 3)); + conf.getInt("hbase.regionserver.executor.openregion.threads", 3)); this.service.startExecutorService(ExecutorType.RS_OPEN_META, - conf.getInt("hbase.regionserver.executor.openmeta.threads", 1)); + conf.getInt("hbase.regionserver.executor.openmeta.threads", 1)); this.service.startExecutorService(ExecutorType.RS_OPEN_PRIORITY_REGION, - conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3)); + conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3)); this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION, - conf.getInt("hbase.regionserver.executor.closeregion.threads", 3)); + conf.getInt("hbase.regionserver.executor.closeregion.threads", 3)); this.service.startExecutorService(ExecutorType.RS_CLOSE_META, - conf.getInt("hbase.regionserver.executor.closemeta.threads", 1)); + conf.getInt("hbase.regionserver.executor.closemeta.threads", 1)); if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) { this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK, - conf.getInt("hbase.storescanner.parallel.seek.threads", 10)); + conf.getInt("hbase.storescanner.parallel.seek.threads", 10)); } this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt( - "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS)); + "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS)); // Start the threads for compacted files discharger this.service.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER, - conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10)); + conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10)); if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) { this.service.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS, - conf.getInt("hbase.regionserver.region.replica.flusher.threads", - conf.getInt("hbase.regionserver.executor.openregion.threads", 3))); + conf.getInt("hbase.regionserver.region.replica.flusher.threads", + conf.getInt("hbase.regionserver.executor.openregion.threads", 3))); } Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller", - uncaughtExceptionHandler); + uncaughtExceptionHandler); this.cacheFlusher.start(uncaughtExceptionHandler); if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker); @@ -1964,19 +1954,7 @@ public class HRegionServer extends HasThread implements // Leases is not a Thread. Internally it runs a daemon thread. If it gets // an unhandled exception, it will just exit. Threads.setDaemonThreadRunning(this.leases.getThread(), getName() + ".leaseChecker", - uncaughtExceptionHandler); - - if (this.replicationSourceHandler == this.replicationSinkHandler && - this.replicationSourceHandler != null) { - this.replicationSourceHandler.startReplicationService(); - } else { - if (this.replicationSourceHandler != null) { - this.replicationSourceHandler.startReplicationService(); - } - if (this.replicationSinkHandler != null) { - this.replicationSinkHandler.startReplicationService(); - } - } + uncaughtExceptionHandler); // Create the log splitting worker and start it // set a smaller retries to fast fail otherwise splitlogworker could be blocked for @@ -1984,12 +1962,77 @@ public class HRegionServer extends HasThread implements // tasks even after current task is preempted after a split task times out. Configuration sinkConf = HBaseConfiguration.create(conf); sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds + conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds + conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1); - this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this, walFactory); - splitLogWorker.start(); + if (this.csm != null) { + // SplitLogWorker needs csm. If none, don't start this. + this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, + this, walFactory); + splitLogWorker.start(); + } else { + LOG.warn("SplitLogWorker Service NOT started; CoordinatedStateManager is null"); + } + + // Memstore services. + startHeapMemoryManager(); + // Call it after starting HeapMemoryManager. + initializeMemStoreChunkCreator(); + } + + private void initializeThreads() throws IOException { + // Cache flushing thread. + this.cacheFlusher = new MemStoreFlusher(conf, this); + + // Compaction thread + this.compactSplitThread = new CompactSplit(this); + + // Background thread to check for compactions; needed if region has not gotten updates + // in a while. It will take care of not checking too frequently on store-by-store basis. + this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this); + this.periodicFlusher = new PeriodicMemStoreFlusher(this.threadWakeFrequency, this); + this.leases = new Leases(this.threadWakeFrequency); + + // Create the thread to clean the moved regions list + movedRegionsCleaner = MovedRegionsCleaner.create(this); + + if (this.nonceManager != null) { + // Create the scheduled chore that cleans up nonces. + nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this); + } + + // Setup the Quota Manager + rsQuotaManager = new RegionServerRpcQuotaManager(this); + rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this); + + if (QuotaUtil.isQuotaEnabled(conf)) { + this.fsUtilizationChore = new FileSystemUtilizationChore(this); + } + + + boolean onlyMetaRefresh = false; + int storefileRefreshPeriod = conf.getInt( + StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, + StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); + if (storefileRefreshPeriod == 0) { + storefileRefreshPeriod = conf.getInt( + StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD, + StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); + onlyMetaRefresh = true; + } + if (storefileRefreshPeriod > 0) { + this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod, + onlyMetaRefresh, this, this); + } + registerConfigurationObservers(); + } + + private void registerConfigurationObservers() { + // Registering the compactSplitThread object with the ConfigurationManager. + configurationManager.registerObserver(this.compactSplitThread); + configurationManager.registerObserver(this.rpcServices); + configurationManager.registerObserver(this); } /** @@ -2057,14 +2100,15 @@ public class HRegionServer extends HasThread implements return false; } // Verify that all threads are alive - if (!(leases.isAlive() - && cacheFlusher.isAlive() && walRoller.isAlive() - && this.compactionChecker.isScheduled() - && this.periodicFlusher.isScheduled())) { + boolean healthy = (this.leases == null || this.leases.isAlive()) + && (this.cacheFlusher == null || this.cacheFlusher.isAlive()) + && (this.walRoller == null || this.walRoller.isAlive()) + && (this.compactionChecker == null || this.compactionChecker.isScheduled()) + && (this.periodicFlusher == null || this.periodicFlusher.isScheduled()); + if (!healthy) { stop("One or more threads are no longer alive -- stop"); - return false; } - return true; + return healthy; } private static final byte[] UNSPECIFIED_REGION = new byte[]{}; @@ -2087,7 +2131,9 @@ public class HRegionServer extends HasThread implements byte[] namespace = regionInfo.getTable().getNamespace(); wal = walFactory.getWAL(regionInfo.getEncodedNameAsBytes(), namespace); } - walRoller.addWAL(wal); + if (this.walRoller != null) { + this.walRoller.addWAL(wal); + } return wal; } @@ -2306,9 +2352,10 @@ public class HRegionServer extends HasThread implements // RegionReplicaFlushHandler might reset this. // submit it to be handled by one of the handlers so that we do not block OpenRegionHandler - this.service.submit( - new RegionReplicaFlushHandler(this, clusterConnection, - rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region)); + if (this.service != null) { + this.service.submit(new RegionReplicaFlushHandler(this, clusterConnection, + rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region)); + } } @Override @@ -2568,6 +2615,7 @@ public class HRegionServer extends HasThread implements * @throws IOException */ private RegionServerStartupResponse reportForDuty() throws IOException { + if (this.masterless) return RegionServerStartupResponse.getDefaultInstance(); ServerName masterServerName = createRegionServerStatusStub(true); if (masterServerName == null) return null; RegionServerStartupResponse result = null; @@ -2894,8 +2942,8 @@ public class HRegionServer extends HasThread implements /** * Load the replication service objects, if any */ - private static void createNewReplicationInstance(Configuration conf, - HRegionServer server, FileSystem walFs, Path walDir, Path oldWALDir) throws IOException{ + private static void createNewReplicationInstance(Configuration conf, HRegionServer server, + FileSystem walFs, Path walDir, Path oldWALDir) throws IOException { if ((server instanceof HMaster) && (!LoadBalancer.isTablesOnMaster(conf) || LoadBalancer.isSystemTablesOnlyOnMaster(conf))) { @@ -3389,8 +3437,7 @@ public class HRegionServer extends HasThread implements // This map will contains all the regions that we closed for a move. // We add the time it was moved as we don't want to keep too old information - protected Map movedRegions = - new ConcurrentHashMap<>(3000); + protected Map movedRegions = new ConcurrentHashMap<>(3000); // We need a timeout. If not there is a risk of giving a wrong information: this would double // the number of network calls instead of reducing them. @@ -3451,7 +3498,6 @@ public class HRegionServer extends HasThread implements /** * Creates a Chore thread to clean the moved region cache. */ - protected final static class MovedRegionsCleaner extends ScheduledChore implements Stoppable { private HRegionServer regionServer; Stoppable stoppable; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index a565eeb557..750bb02a9d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -315,6 +315,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler, final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false); /** + * Services launched in RSRpcServices. By default they are on but you can use the below + * booleans to selectively enable/disable either Admin or Client Service (Rare is the case + * where you would ever turn off one or the other). + */ + public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG = + "hbase.regionserver.admin.service"; + public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG = + "hbase.regionserver.client.service"; + + /** * An Rpc callback for closing a RegionScanner. */ private static final class RegionScannerCloseCallBack implements RpcCallback { @@ -1426,17 +1436,31 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } /** - * @return list of blocking services and their security info classes that this server supports + * By default, put up an Admin and a Client Service. + * Set booleans hbase.regionserver.admin.service and + * hbase.regionserver.client.service if you want to enable/disable services. + * Default is that both are enabled. + * @return immutable list of blocking services and the security info classes that this server + * supports */ protected List getServices() { - List bssi = new ArrayList<>(2); - bssi.add(new BlockingServiceAndInterface( + boolean admin = + getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true); + boolean client = + getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true); + List bssi = new ArrayList<>(); + if (client) { + bssi.add(new BlockingServiceAndInterface( ClientService.newReflectiveBlockingService(this), ClientService.BlockingInterface.class)); - bssi.add(new BlockingServiceAndInterface( + } + if (admin) { + bssi.add(new BlockingServiceAndInterface( AdminService.newReflectiveBlockingService(this), AdminService.BlockingInterface.class)); - return bssi; + } + return new org.apache.hadoop.hbase.shaded.com.google.common.collect. + ImmutableList.Builder().addAll(bssi).build(); } public InetSocketAddress getSocketAddress() { @@ -1964,20 +1988,24 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } // If there is no action in progress, we can submit a specific handler. // Need to pass the expected version in the constructor. - if (region.isMetaRegion()) { - regionServer.service.submit(new OpenMetaHandler( - regionServer, regionServer, region, htd, masterSystemTime)); + if (regionServer.service == null) { + LOG.info("No executor service; skipping open request"); } else { - if (regionOpenInfo.getFavoredNodesCount() > 0) { - regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(), - regionOpenInfo.getFavoredNodesList()); - } - if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) { - regionServer.service.submit(new OpenPriorityRegionHandler( - regionServer, regionServer, region, htd, masterSystemTime)); + if (region.isMetaRegion()) { + regionServer.service.submit(new OpenMetaHandler( + regionServer, regionServer, region, htd, masterSystemTime)); } else { - regionServer.service.submit(new OpenRegionHandler( + if (regionOpenInfo.getFavoredNodesCount() > 0) { + regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(), + regionOpenInfo.getFavoredNodesList()); + } + if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) { + regionServer.service.submit(new OpenPriorityRegionHandler( regionServer, regionServer, region, htd, masterSystemTime)); + } else { + regionServer.service.submit(new OpenRegionHandler( + regionServer, regionServer, region, htd, masterSystemTime)); + } } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStandaloneRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStandaloneRegionServer.java new file mode 100644 index 0000000000..8e6d07f5e5 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStandaloneRegionServer.java @@ -0,0 +1,63 @@ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.CategoryBasedTimeout; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.rules.TestRule; + +import java.io.IOException; + +import static junit.framework.TestCase.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Test starting up a RegionServer w/o a Master: i.e. make sure + * {{@link HRegionServer#MASTERLESS_CONFIG_NAME}} flag is working. + */ +@Category(SmallTests.class) +public class TestStandaloneRegionServer { + @Rule public TestName name = new TestName(); + @Rule public final TestRule timeout = CategoryBasedTimeout.builder().withTimeout(this.getClass()). + withLookingForStuckThread(true).build(); + private static final HBaseTestingUtility HTU = HBaseTestingUtility.createLocalHTU(); + + @BeforeClass + public static void beforeClass() throws Exception { + // Make it so HRegionServer comes up Masterless. + HTU.getConfiguration().setBoolean(HRegionServer.MASTERLESS_CONFIG_NAME, true); + HTU.startMiniZKCluster(); + } + + @AfterClass + public static void afterClass() throws IOException { + HTU.shutdownMiniZKCluster(); + } + + /** + * Test a HRegionServer comes up and that the Admin and Client Services are available. + */ + @Test + public void testRegionServerComesUp() throws IOException, InterruptedException { + HRegionServer hrs = new HRegionServer(HTU.getConfiguration(), null); + hrs.start(); + while (!hrs.isOnline()) Threads.sleep(1); + assertTrue(hrs.isOnline()); + assertNotNull(hrs.getServerName()); + assertFalse(hrs.isAborted()); // This is an Admin Interface call. + assertTrue(hrs.getRegions().isEmpty()); // This is a Client Interface call. + hrs.stop("Test is done!"); + while(hrs.isStopping()) Threads.sleep(1); + assertTrue(hrs.isStopped()); + // Stop is not enough because zk shutdown happens AFTER stop. If we don't join, we'll have a + // messy zk shutdown. + hrs.join(); + } +} -- 2.11.0 (Apple Git-81)