commit 47e01d944a17f817f0583c5c624e5e048de20e3c Author: Enis Soztutar Date: Thu Apr 16 23:30:27 2015 -0700 v4 with only cleanup diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java hbase-common/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java index 73512fa..4eb5792 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java @@ -174,4 +174,10 @@ public class RetryCounter { public int getAttemptTimes() { return attempts; } + + @Override + public String toString() { + return new StringBuilder(" retrying, attempts:") + .append(attempts).append("/").append(retryConfig.getMaxAttempts()).toString(); + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java index d682ccc..40e1559 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java @@ -468,7 +468,7 @@ public class HFileArchiver { * @throws IOException if a file cannot be deleted. All files will be attempted to deleted before * throwing the exception, rather than failing at the first file. */ - private static void deleteStoreFilesWithoutArchiving(Collection compactedFiles) + public static void deleteStoreFilesWithoutArchiving(Collection compactedFiles) throws IOException { LOG.debug("Deleting store files without archiving."); List errors = new ArrayList(0); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java index 1ec085f..f9c059f 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java @@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.NoLimitScannerContext; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.mortbay.log.Log; @@ -42,20 +44,33 @@ import org.mortbay.log.Log; @InterfaceAudience.Private public class ClientSideRegionScanner extends AbstractClientScanner { - private HRegion region; - RegionScanner scanner; + private final Region region; + private final boolean closeRegion; // whether to close the region at the end + private RegionScanner scanner; List values; public ClientSideRegionScanner(Configuration conf, FileSystem fs, Path rootDir, HTableDescriptor htd, HRegionInfo hri, Scan scan, ScanMetrics scanMetrics) throws IOException { + // open region from the snapshot directory + this.region = HRegion.openHRegion(conf, fs, rootDir, hri, htd, null, null, null); + this.closeRegion = true; + // region is immutable, set isolation level scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); - // open region from the snapshot directory - this.region = HRegion.openHRegion(conf, fs, rootDir, hri, htd, null, null, null); + init(this.region, scan, scanMetrics); + } + + @InterfaceAudience.Private + public ClientSideRegionScanner(Region region, Scan scan) throws IOException { + this.region = region; + this.closeRegion = false; + init(region, scan, null); + } + private void init(Region region, Scan scan, ScanMetrics scanMetrics) throws IOException { // create an internal region scanner this.scanner = region.getScanner(scan); values = new ArrayList(); @@ -65,7 +80,6 @@ public class ClientSideRegionScanner extends AbstractClientScanner { } else { this.scanMetrics = scanMetrics; } - region.startRegionOperation(); } @Override @@ -94,16 +108,15 @@ public class ClientSideRegionScanner extends AbstractClientScanner { if (this.scanner != null) { try { this.scanner.close(); - this.scanner = null; } catch (IOException ex) { Log.warn("Exception while closing scanner", ex); } } - if (this.region != null) { + if (this.closeRegion && this.region != null) { try { - this.region.closeRegionOperation(); - this.region.close(true); - this.region = null; + if (this.closeRegion) { + ((HRegion)this.region).close(true); + } } catch (IOException ex) { Log.warn("Exception while closing region", ex); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index bc52edb..bc3245f 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -2053,6 +2053,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server { if (isAborted() || isStopped()) { return; } + this.abortRequested = true; if (cpHost != null) { // HBASE-4014: dump a list of loaded coprocessors. LOG.fatal("Master server abort: loaded coprocessors are: " + diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java index 3718a5a..ada98b9 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.master; import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -109,6 +110,14 @@ public class MasterFileSystem { public MasterFileSystem(Server master, MasterServices services) throws IOException { + this(master, services, true, + new SplitLogManager(master, master.getConfiguration(), master, services, + master.getServerName())); + } + + public MasterFileSystem(Server master, MasterServices services, + boolean initializeFsLayout, SplitLogManager splitLogManager) + throws IOException { this.conf = master.getConfiguration(); this.master = master; this.services = services; @@ -126,12 +135,14 @@ public class MasterFileSystem { fs.setConf(conf); // setup the filesystem variable // set up the archived logs path - this.oldLogDir = createInitialFileSystemLayout(); + this.oldLogDir = new Path(this.rootdir, HConstants.HREGION_OLDLOGDIR_NAME); + if (initializeFsLayout) { + createInitialFileSystemLayout(); + } HFileSystem.addLocationsOrderInterceptor(conf); - this.splitLogManager = - new SplitLogManager(master, master.getConfiguration(), master, services, - master.getServerName()); - this.distributedLogReplay = this.splitLogManager.isLogReplaying(); + this.splitLogManager = splitLogManager; + this.distributedLogReplay + = splitLogManager == null ? false : this.splitLogManager.isLogReplaying(); } @VisibleForTesting @@ -156,8 +167,6 @@ public class MasterFileSystem { // check if temp directory exists and clean it checkTempDir(this.tempdir, conf, this.fs); - Path oldLogDir = new Path(this.rootdir, HConstants.HREGION_OLDLOGDIR_NAME); - // Make sure the region servers can archive their old logs if(!this.fs.exists(oldLogDir)) { this.fs.mkdirs(oldLogDir); @@ -221,7 +230,7 @@ public class MasterFileSystem { * Inspect the log directory to find dead servers which need recovery work * @return A set of ServerNames which aren't running but still have WAL files left in file system */ - Set getFailedServersFromLogFolders() { + public Set getFailedServersFromLogFolders() { boolean retrySplitting = !conf.getBoolean("hbase.hlog.split.skip.errors", WALSplitter.SPLIT_SKIP_ERRORS_DEFAULT); @@ -238,8 +247,10 @@ public class MasterFileSystem { FileStatus[] logFolders = FSUtils.listStatus(this.fs, logsDirPath, null); // Get online servers after getting log folders to avoid log folder deletion of newly // checked in region servers . see HBASE-5916 - Set onlineServers = ((HMaster) master).getServerManager().getOnlineServers() - .keySet(); + Set onlineServers = Collections.emptySet(); + if (services != null) { + onlineServers = services.getServerManager().getOnlineServers().keySet(); + } if (logFolders == null || logFolders.length == 0) { LOG.debug("No log files to split, proceeding..."); @@ -317,10 +328,10 @@ public class MasterFileSystem { @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UL_UNRELEASED_LOCK", justification= "We only release this lock when we set it. Updates to code that uses it should verify use " + "of the guard boolean.") - private List getLogDirs(final Set serverNames) throws IOException { + public List renameWALDirs(final Set serverNames) throws IOException { List logDirs = new ArrayList(); boolean needReleaseLock = false; - if (!this.services.isInitialized()) { + if (this.services != null && !this.services.isInitialized()) { // during master initialization, we could have multiple places splitting a same wal this.splitLogLock.lock(); needReleaseLock = true; @@ -336,7 +347,9 @@ public class MasterFileSystem { throw new IOException("Failed fs.rename for log split: " + logDir); } logDir = splitDir; - LOG.debug("Renamed region directory: " + splitDir); + if (LOG.isDebugEnabled()) { + LOG.debug("Renamed WAL directory: " + splitDir); + } } else if (!fs.exists(splitDir)) { LOG.info("Log dir for server " + serverName + " does not exist"); continue; @@ -392,7 +405,7 @@ public class MasterFileSystem { */ public void splitLog(final Set serverNames, PathFilter filter) throws IOException { long splitTime = 0, splitLogSize = 0; - List logDirs = getLogDirs(serverNames); + List logDirs = renameWALDirs(serverNames); splitLogManager.handleDeadWorkers(serverNames); splitTime = EnvironmentEdgeManager.currentTime(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index de82692..81a3e78 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -75,8 +75,8 @@ import com.google.common.annotations.VisibleForTesting; *

SplitLogManager monitors the tasks that it creates using the * timeoutMonitor thread. If a task's progress is slow then * {@link SplitLogManagerCoordination#checkTasks} will take away the - * task from the owner {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker} - * and the task will be up for grabs again. When the task is done then it is + * task from the owner {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker} + * and the task will be up for grabs again. When the task is done then it is * deleted by SplitLogManager. * *

Clients call {@link #splitLogDistributed(Path)} to split a region server's @@ -176,7 +176,6 @@ public class SplitLogManager { * {@link org.apache.hadoop.hbase.wal.WALSplitter#split(Path, Path, Path, FileSystem, * Configuration, WALFactory)} for tests. */ - @VisibleForTesting public static FileStatus[] getFileList(final Configuration conf, final List logDirs, final PathFilter filter) throws IOException { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/region/RegionServices.java hbase-server/src/main/java/org/apache/hadoop/hbase/region/RegionServices.java new file mode 100644 index 0000000..6a4335c --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/region/RegionServices.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.region; + +import java.util.Map; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.regionserver.CompactionRequestor; +import org.apache.hadoop.hbase.regionserver.FlushRequester; +import org.apache.hadoop.hbase.regionserver.HeapMemoryManager; +import org.apache.hadoop.hbase.regionserver.OnlineRegions; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; +import org.apache.hadoop.hbase.regionserver.ServerNonceManager; + +/** + * Services related to hosting regions. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) +@InterfaceStability.Evolving +public interface RegionServices extends OnlineRegions, Stoppable, Abortable { + // TODO: Guava Service? + + /** + * @return True if this region service is stopping. + */ + boolean isStopping(); + + /** + * @return Return the FileSystem object used by the region service + */ + FileSystem getFileSystem(); + + /** + * @return Implementation of {@link CompactionRequestor} or null. + */ + CompactionRequestor getCompactionRequester(); + + /** + * @return Implementation of {@link FlushRequester} or null. + */ + FlushRequester getFlushRequester(); + + // TODO: RegionServiceAccounting? + /** + * @return the RegionServerAccounting for this Region Server + */ + RegionServerAccounting getRegionServerAccounting(); + + /** + * @return heap memory manager instance + */ + HeapMemoryManager getHeapMemoryManager(); + + + // TODO: HRegion should not depend on the below + /** + * Only required for "old" log replay; if it's removed, remove this. + * @return The RegionServer's NonceManager + */ + public ServerNonceManager getNonceManager(); + + /** + * @return set of recovering regions on the hosting region server + */ + Map getRecoveringRegions(); + +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java index 930baf0..83f2c63 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java @@ -93,4 +93,10 @@ public interface CompactionRequestor { CompactionRequest requestCompaction( final Region r, final Store s, final String why, int pri, CompactionRequest request ) throws IOException; + + public void requestSystemCompaction( + final Region r, final String why) throws IOException; + + public void requestSystemCompaction( + final Region r, final Store s, final String why) throws IOException; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 7e70f11..bf6907e 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -6457,7 +6457,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * Perform atomic mutations within the region w/o nonces. * See {@link #mutateRowsWithLocks(Collection, Collection, long, long)} */ - public void mutateRowsWithLocks(Collection mutations, + public void mutateRowsWithLocks(Collection mutations, Collection rowsToLock) throws IOException { mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE); } @@ -6474,8 +6474,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * rowsToLock is sorted in order to avoid deadlocks. * @throws IOException */ + @Override - public void mutateRowsWithLocks(Collection mutations, + public void mutateRowsWithLocks(Collection mutations, Collection rowsToLock, long nonceGroup, long nonce) throws IOException { MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock); processRowsWithLocks(proc, -1, nonceGroup, nonce); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index a62b6d7..f078a05 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -80,6 +80,7 @@ public class HRegionFileSystem { private final Configuration conf; private final Path tableDir; private final FileSystem fs; + private final boolean deleteWithoutArchiving; /** * In order to handle NN connectivity hiccups, one need to retry non-idempotent operation at the @@ -97,7 +98,7 @@ public class HRegionFileSystem { * @param tableDir {@link Path} to where the table is being stored * @param regionInfo {@link HRegionInfo} for region */ - HRegionFileSystem(final Configuration conf, final FileSystem fs, final Path tableDir, + public HRegionFileSystem(final Configuration conf, final FileSystem fs, final Path tableDir, final HRegionInfo regionInfo) { this.fs = fs; this.conf = conf; @@ -108,6 +109,7 @@ public class HRegionFileSystem { DEFAULT_HDFS_CLIENT_RETRIES_NUMBER); this.baseSleepBeforeRetries = conf.getInt("hdfs.client.sleep.before.retries", DEFAULT_BASE_SLEEP_BEFORE_RETRIES); + this.deleteWithoutArchiving = conf.getBoolean("hbase.hfile.delete.without.archiving", false); } /** @return the underlying {@link FileSystem} */ @@ -422,8 +424,12 @@ public class HRegionFileSystem { */ public void removeStoreFiles(final String familyName, final Collection storeFiles) throws IOException { - HFileArchiver.archiveStoreFiles(this.conf, this.fs, this.regionInfoForFs, + if (deleteWithoutArchiving) { + HFileArchiver.deleteStoreFilesWithoutArchiving(storeFiles); + } else { + HFileArchiver.archiveStoreFiles(this.conf, this.fs, this.regionInfoForFs, this.tableDir, Bytes.toBytes(familyName), storeFiles); + } } /** diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 7666057..99b2a11 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -44,8 +44,8 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.management.MalformedObjectNameException; @@ -132,6 +132,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Repor import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; +import org.apache.hadoop.hbase.region.RegionServices; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; @@ -156,8 +157,9 @@ import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.util.Sleeper; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.VersionInfo; -import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALContainer; +import org.apache.hadoop.hbase.wal.WALContainer.FSWALContext; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; @@ -295,7 +297,7 @@ public class HRegionServer extends HasThread implements // Go down hard. Used if file system becomes unavailable and also in // debugging and unit tests. - private volatile boolean abortRequested; + protected volatile boolean abortRequested; ConcurrentMap rowlocks = new ConcurrentHashMap(); @@ -347,20 +349,14 @@ public class HRegionServer extends HasThread implements /* * Check for compactions requests. */ - ScheduledChore compactionChecker; + CompactionChecker compactionChecker; /* * Check for flushes */ - ScheduledChore periodicFlusher; + PeriodicMemstoreFlusher periodicFlusher; - protected volatile WALFactory walFactory; - - // WAL roller. log is protected rather than private to avoid - // eclipse warning when accessed by inner classes - final LogRoller walRoller; - // Lazily initialized if this RegionServer hosts a meta table. - final AtomicReference metawalRoller = new AtomicReference(); + protected final WALContainer walContainer; // flag set after we're done setting up server threads final AtomicBoolean online = new AtomicBoolean(false); @@ -597,8 +593,10 @@ public class HRegionServer extends HasThread implements rpcServices.start(); putUpWebUI(); - this.walRoller = new LogRoller(this, this); + this.walContainer = new WALContainer(this, this, uncaughtExceptionHandler); this.choreService = new ChoreService(getServerName().toString()); + + initializeFlushCompactionThreads(); } protected TableDescriptors getFsTableDescriptors() throws IOException { @@ -821,7 +819,7 @@ public class HRegionServer extends HasThread implements return clusterStatusTracker != null && clusterStatusTracker.isClusterUp(); } - private void initializeThreads() throws IOException { + private void initializeFlushCompactionThreads() { // Cache flushing thread. this.cacheFlusher = new MemStoreFlusher(conf, this); @@ -831,8 +829,12 @@ public class HRegionServer extends HasThread implements // Background thread to check for compactions; needed if region has not gotten updates // in a while. It will take care of not checking too frequently on store-by-store basis. this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this); - this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this); - this.leases = new Leases(this.threadWakeFrequency); + this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, + this.serverName, this); + } + + private void initializeThreads() throws IOException { + this.leases = new Leases(this.threadWakeFrequency); // Create the thread to clean the moved regions list movedRegionsCleaner = MovedRegionsCleaner.create(this); @@ -1299,19 +1301,17 @@ public class HRegionServer extends HasThread implements } private void shutdownWAL(final boolean close) { - if (this.walFactory != null) { - try { - if (close) { - walFactory.close(); - } else { - walFactory.shutdown(); - } - } catch (Throwable e) { - e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; - LOG.error("Shutdown / close of WAL failed: " + e); - LOG.debug("Shutdown / close exception details:", e); - } - } + walContainer.shutdownWAL(close); + } + + @Override + public WALFactory getWALFactory() { + return walContainer.getWALFactory(); + } + + @Override + public void requestRollAll() { + walContainer.requestRollAll(); } /* @@ -1362,7 +1362,7 @@ public class HRegionServer extends HasThread implements ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath()); this.cacheConfig = new CacheConfig(conf); - this.walFactory = setupWALAndReplication(); + setupWALAndReplication(); // Init in here rather than in constructor after thread name has been set this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this)); @@ -1500,7 +1500,7 @@ public class HRegionServer extends HasThread implements * Inner class that runs on a long period checking if regions need compaction. */ private static class CompactionChecker extends ScheduledChore { - private final HRegionServer instance; + private final List regionServices = new CopyOnWriteArrayList<>(); private final int majorCompactPriority; private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE; private long iteration = 0; @@ -1508,44 +1508,52 @@ public class HRegionServer extends HasThread implements CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) { super("CompactionChecker", stopper, sleepTime); - this.instance = h; + regionServices.add(h); LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime)); /* MajorCompactPriority is configurable. * If not set, the compaction will use default priority. */ - this.majorCompactPriority = this.instance.conf. - getInt("hbase.regionserver.compactionChecker.majorCompactPriority", - DEFAULT_PRIORITY); + this.majorCompactPriority = h.getConfiguration() + .getInt("hbase.regionserver.compactionChecker.majorCompactPriority", DEFAULT_PRIORITY); + } + + private void addRegionServices(RegionServices rs) { + this.regionServices.add(rs); + } + private void removeRegionServices(RegionServices rs) { + this.regionServices.remove(rs); } @Override protected void chore() { - for (Region r : this.instance.onlineRegions.values()) { - if (r == null) - continue; - for (Store s : r.getStores()) { - try { - long multiplier = s.getCompactionCheckMultiplier(); - assert multiplier > 0; - if (iteration % multiplier != 0) continue; - if (s.needsCompaction()) { - // Queue a compaction. Will recognize if major is needed. - this.instance.compactSplitThread.requestSystemCompaction(r, s, getName() + for (RegionServices rs : regionServices) { + for (Region r : rs.getOnlineRegions()) { + if (r == null) + continue; + for (Store s : r.getStores()) { + try { + long multiplier = s.getCompactionCheckMultiplier(); + assert multiplier > 0; + if (iteration % multiplier != 0) continue; + if (s.needsCompaction()) { + // Queue a compaction. Will recognize if major is needed. + rs.getCompactionRequester().requestSystemCompaction(r, s, getName() + " requests compaction"); - } else if (s.isMajorCompaction()) { - if (majorCompactPriority == DEFAULT_PRIORITY - || majorCompactPriority > ((HRegion)r).getCompactPriority()) { - this.instance.compactSplitThread.requestCompaction(r, s, getName() + } else if (s.isMajorCompaction()) { + if (majorCompactPriority == DEFAULT_PRIORITY + || majorCompactPriority > ((HRegion)r).getCompactPriority()) { + rs.getCompactionRequester().requestCompaction(r, s, getName() + " requests major compaction; use default priority", null); - } else { - this.instance.compactSplitThread.requestCompaction(r, s, getName() + } else { + rs.getCompactionRequester().requestCompaction(r, s, getName() + " requests major compaction; use configured priority", - this.majorCompactPriority, null); + this.majorCompactPriority, null); + } } + } catch (IOException e) { + LOG.warn("Failed major compaction check on " + r, e); } - } catch (IOException e) { - LOG.warn("Failed major compaction check on " + r, e); } } } @@ -1553,32 +1561,66 @@ public class HRegionServer extends HasThread implements } } + /** + * RegionServer hosts some chores (MemstoreFlusher, periodic flusher, CompactionChecker, etc) + * that some other RegionServices may want to share. This method adds the passed RegionServices + * to the chores so that the chores also see their regions. + * @param rs the region services to send to chore threads + */ + @InterfaceAudience.Private + public void addRegionServices(RegionServices rs) { + this.periodicFlusher.addRegionServices(rs); + this.compactionChecker.addRegionServices(rs); + } + + /** + * Removed the region services from region servers maintained chores + * @param rs the region services to remove from chore threeads + */ + @InterfaceAudience.Private + public void removeRegionServices(RegionServices rs) { + this.periodicFlusher.removeRegionServices(rs); + this.compactionChecker.removeRegionServices(rs); + } + + @InterfaceAudience.Private static class PeriodicMemstoreFlusher extends ScheduledChore { - final HRegionServer server; + final List regionServices = new CopyOnWriteArrayList<>(); final static int RANGE_OF_DELAY = 20000; //millisec final static int MIN_DELAY_TIME = 3000; //millisec - public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) { - super(server.getServerName() + "-MemstoreFlusherChore", server, cacheFlushInterval); - this.server = server; + public PeriodicMemstoreFlusher(int cacheFlushInterval, final ServerName serverName, + final RegionServices server) { + super(serverName + "-MemstoreFlusherChore", server, cacheFlushInterval); + regionServices.add(server); + } + + public void addRegionServices(RegionServices rs) { + this.regionServices.add(rs); + } + + public void removeRegionServices(RegionServices rs) { + this.regionServices.remove(rs); } @Override protected void chore() { - final StringBuffer whyFlush = new StringBuffer(); - for (Region r : this.server.onlineRegions.values()) { - if (r == null) continue; - if (((HRegion)r).shouldFlush(whyFlush)) { - FlushRequester requester = server.getFlushRequester(); - if (requester != null) { - long randomDelay = RandomUtils.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME; - LOG.info(getName() + " requesting flush of " + - r.getRegionInfo().getRegionNameAsString() + " because " + - whyFlush.toString() + - " after random delay " + randomDelay + "ms"); - //Throttle the flushes by putting a delay. If we don't throttle, and there - //is a balanced write-load on the regions in a table, we might end up - //overwhelming the filesystem with too many flushes at once. - requester.requestDelayedFlush(r, randomDelay, false); + for (RegionServices rs : regionServices) { + final StringBuffer whyFlush = new StringBuffer(); + for (Region r : rs.getOnlineRegions()) { + if (r == null) continue; + if (((HRegion)r).shouldFlush(whyFlush)) { + FlushRequester requester = rs.getFlushRequester(); + if (requester != null) { + long randomDelay = RandomUtils.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME; + LOG.info(getName() + " requesting flush of " + + r.getRegionInfo().getRegionNameAsString() + " because " + + whyFlush.toString() + + " after random delay " + randomDelay + "ms"); + //Throttle the flushes by putting a delay. If we don't throttle, and there + //is a balanced write-load on the regions in a table, we might end up + //overwhelming the filesystem with too many flushes at once. + requester.requestDelayedFlush(r, randomDelay, false); + } } } } @@ -1602,21 +1644,14 @@ public class HRegionServer extends HasThread implements * @return A WAL instance. * @throws IOException */ - private WALFactory setupWALAndReplication() throws IOException { + private void setupWALAndReplication() throws IOException { // TODO Replication make assumptions here based on the default filesystem impl - final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); - final String logName = DefaultWALProvider.getWALDirectoryName(this.serverName.toString()); - - Path logdir = new Path(rootDir, logName); - if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir); - if (this.fs.exists(logdir)) { - throw new RegionServerRunningException("Region server has already " + - "created directory at " + this.serverName.toString()); - } + FSWALContext fsWAlContext = walContainer.checkServerWALDir(this.fs, rootDir, this.serverName); // Instantiate replication manager if replication enabled. Pass it the // log directories. - createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir); + createNewReplicationInstance(conf, this, this.fs, fsWAlContext.serverWalDir, + fsWAlContext.walArchiveDir); // listeners the wal factory will add to wals it creates. final List listeners = new ArrayList(); @@ -1627,35 +1662,7 @@ public class HRegionServer extends HasThread implements listeners.add(this.replicationSourceHandler.getWALActionsListener()); } - return new WALFactory(conf, listeners, serverName.toString()); - } - - /** - * We initialize the roller for the wal that handles meta lazily - * since we don't know if this regionserver will handle it. All calls to - * this method return a reference to the that same roller. As newly referenced - * meta regions are brought online, they will be offered to the roller for maintenance. - * As a part of that registration process, the roller will add itself as a - * listener on the wal. - */ - protected LogRoller ensureMetaWALRoller() { - // Using a tmp log roller to ensure metaLogRoller is alive once it is not - // null - LogRoller roller = metawalRoller.get(); - if (null == roller) { - LogRoller tmpLogRoller = new LogRoller(this, this); - String n = Thread.currentThread().getName(); - Threads.setDaemonThreadRunning(tmpLogRoller.getThread(), - n + "-MetaLogRoller", uncaughtExceptionHandler); - if (metawalRoller.compareAndSet(null, tmpLogRoller)) { - roller = tmpLogRoller; - } else { - // Another thread won starting the roller - Threads.shutdown(tmpLogRoller.getThread()); - roller = metawalRoller.get(); - } - } - return roller; + walContainer.setupWAL(conf, serverName, listeners); } public MetricsRegionServer getRegionServerMetrics() { @@ -1704,8 +1711,8 @@ public class HRegionServer extends HasThread implements conf.getInt("hbase.regionserver.executor.openregion.threads", 3))); } - Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller", - uncaughtExceptionHandler); + this.walContainer.startServiceThreads(getName()); + this.cacheFlusher.start(uncaughtExceptionHandler); if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker); @@ -1742,7 +1749,8 @@ public class HRegionServer extends HasThread implements sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1); - this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this, walFactory); + this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this, + walContainer.getWALFactory()); splitLogWorker.start(); } @@ -1812,38 +1820,18 @@ public class HRegionServer extends HasThread implements } // Verify that all threads are alive if (!(leases.isAlive() - && cacheFlusher.isAlive() && walRoller.isAlive() + && cacheFlusher.isAlive() && walContainer.isServiceThreadsAlive() && this.compactionChecker.isScheduled() && this.periodicFlusher.isScheduled())) { stop("One or more threads are no longer alive -- stop"); return false; } - final LogRoller metawalRoller = this.metawalRoller.get(); - if (metawalRoller != null && !metawalRoller.isAlive()) { - stop("Meta WAL roller thread is no longer alive -- stop"); - return false; - } return true; } - private static final byte[] UNSPECIFIED_REGION = new byte[]{}; - @Override public WAL getWAL(HRegionInfo regionInfo) throws IOException { - WAL wal; - LogRoller roller = walRoller; - //_ROOT_ and hbase:meta regions have separate WAL. - if (regionInfo != null && regionInfo.isMetaTable() && - regionInfo.getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) { - roller = ensureMetaWALRoller(); - wal = walFactory.getMetaWAL(regionInfo.getEncodedNameAsBytes()); - } else if (regionInfo == null) { - wal = walFactory.getWAL(UNSPECIFIED_REGION); - } else { - wal = walFactory.getWAL(regionInfo.getEncodedNameAsBytes()); - } - roller.addWAL(wal); - return wal; + return walContainer.getWAL(regionInfo); } @Override @@ -2146,12 +2134,8 @@ public class HRegionServer extends HasThread implements if (this.spanReceiverHost != null) { this.spanReceiverHost.closeReceivers(); } - if (this.walRoller != null) { - Threads.shutdown(this.walRoller.getThread()); - } - final LogRoller metawalRoller = this.metawalRoller.get(); - if (metawalRoller != null) { - Threads.shutdown(metawalRoller.getThread()); + if (this.walContainer != null) { + walContainer.stopServiceThreads(); } if (this.compactSplitThread != null) { this.compactSplitThread.join(); @@ -2700,6 +2684,11 @@ public class HRegionServer extends HasThread implements return tableRegions; } + @Override + public Collection getOnlineRegions() { + return onlineRegions.values(); + } + /** * Gets the online tables in this RS. * This method looks at the in-memory onlineRegions. diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index 7649ac9..0fe9d36 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.region.RegionServices; import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; @@ -56,7 +57,7 @@ public class LogRoller extends HasThread { private final ConcurrentHashMap walNeedsRoll = new ConcurrentHashMap(); private final Server server; - protected final RegionServerServices services; + protected final RegionServices services; private volatile long lastrolltime = System.currentTimeMillis(); // Period to roll log. private final long rollperiod; @@ -89,7 +90,7 @@ public class LogRoller extends HasThread { } /** @param server */ - public LogRoller(final Server server, final RegionServerServices services) { + public LogRoller(final Server server, final RegionServices services) { super(); this.server = server; this.services = services; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java index ff8c308..8597cc3 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java @@ -298,7 +298,7 @@ class MetricsRegionServerWrapperImpl public long getNumStores() { return numStores; } - + @Override public long getNumWALFiles() { return numWALFiles; @@ -308,7 +308,7 @@ class MetricsRegionServerWrapperImpl public long getWALFileSize() { return walFileSize; } - + @Override public long getNumStoreFiles() { return numStoreFiles; @@ -523,10 +523,10 @@ class MetricsRegionServerWrapperImpl } lastRan = currentTime; - numWALFiles = DefaultWALProvider.getNumLogFiles(regionServer.walFactory) + - BoundedRegionGroupingProvider.getNumLogFiles(regionServer.walFactory); - walFileSize = DefaultWALProvider.getLogFileSize(regionServer.walFactory) + - BoundedRegionGroupingProvider.getLogFileSize(regionServer.walFactory); + numWALFiles = DefaultWALProvider.getNumLogFiles(regionServer.getWALFactory()) + + BoundedRegionGroupingProvider.getNumLogFiles(regionServer.getWALFactory()); + walFileSize = DefaultWALProvider.getLogFileSize(regionServer.getWALFactory()) + + BoundedRegionGroupingProvider.getLogFileSize(regionServer.getWALFactory()); //Copy over computed values so that no thread sees half computed values. numStores = tempNumStores; numStoreFiles = tempNumStoreFiles; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java index 1947a1b..64c156e 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java @@ -42,10 +42,10 @@ import org.apache.hadoop.hbase.util.Bytes; class MultiRowMutationProcessor extends BaseRowProcessor { Collection rowsToLock; - Collection mutations; + Collection mutations; MiniBatchOperationInProgress miniBatch; - MultiRowMutationProcessor(Collection mutations, + MultiRowMutationProcessor(Collection mutations, Collection rowsToLock) { this.rowsToLock = rowsToLock; this.mutations = mutations; @@ -60,7 +60,7 @@ MultiRowMutationProcessorResponse> { public boolean readOnly() { return false; } - + @Override public MultiRowMutationProcessorResponse getResult() { return MultiRowMutationProcessorResponse.getDefaultInstance(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java index 60fc9fb..41ccb7e 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java @@ -19,13 +19,13 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.Collection; import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; /** @@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.ServerName; */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) @InterfaceStability.Evolving -public interface OnlineRegions extends Server { +public interface OnlineRegions { /** * Add to online regions. * @param r @@ -67,4 +67,12 @@ public interface OnlineRegions extends Server { * @throws java.io.IOException */ List getOnlineRegions(TableName tableName) throws IOException; + + /** + * Get all online regions. This may not be a clone of the list. Returned list should be treated + * carefully. + * @return List of Region + * @throws java.io.IOException + */ + Collection getOnlineRegions(); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index d7be4b4..7ea3a99 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1763,7 +1763,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, checkOpen(); requestCount.increment(); regionServer.getRegionServerCoprocessorHost().preRollWALWriterRequest(); - regionServer.walRoller.requestRollAll(); + regionServer.walContainer.requestRollAll(); regionServer.getRegionServerCoprocessorHost().postRollWALWriterRequest(); RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder(); return builder.build(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 9da99ab..e3632a7 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -438,7 +438,7 @@ public interface Region extends ConfigurationObserver { * rowsToLock is sorted in order to avoid deadlocks. * @throws IOException */ - void mutateRowsWithLocks(Collection mutations, Collection rowsToLock, + void mutateRowsWithLocks(Collection mutations, Collection rowsToLock, long nonceGroup, long nonce) throws IOException; /** diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index cd4816c..91087e7 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -34,7 +35,8 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; -import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.region.RegionServices; +import org.apache.hadoop.hbase.wal.WALServices; import org.apache.zookeeper.KeeperException; import com.google.protobuf.Service; @@ -44,29 +46,25 @@ import com.google.protobuf.Service; */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) @InterfaceStability.Evolving -public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegion { - /** - * @return True if this regionserver is stopping. - */ - boolean isStopping(); +public interface RegionServerServices + extends RegionServices, WALServices, FavoredNodesForRegion, Server { - /** @return the WAL for a particular region. Pass null for getting the - * default (common) WAL */ - WAL getWAL(HRegionInfo regionInfo) throws IOException; + // TODO: Implementation note: + // Currently RegionServerServices consists of services related to hosting regions and other RS + // related things like RPC. Region related hosting services should be kept in RegionServices while + // RS specific services should be kept here. Ideally, HRegion should not know about + // RegionServerServices, but RegionServices /** - * @return Implementation of {@link CompactionRequestor} or null. - */ - CompactionRequestor getCompactionRequester(); - - /** - * @return Implementation of {@link FlushRequester} or null. + * @return True if this regionserver is stopping. */ - FlushRequester getFlushRequester(); + @Override + boolean isStopping(); /** * @return the RegionServerAccounting for this Region Server */ + @Override RegionServerAccounting getRegionServerAccounting(); /** @@ -182,6 +180,7 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi /** * @return Return the FileSystem object used by the regionserver */ + @Override FileSystem getFileSystem(); /** @@ -197,12 +196,14 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi /** * @return set of recovering regions on the hosting region server */ + @Override Map getRecoveringRegions(); /** * Only required for "old" log replay; if it's removed, remove this. * @return The RegionServer's NonceManager */ + @Override public ServerNonceManager getNonceManager(); /** @@ -222,6 +223,7 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi /** * @return heap memory manager instance */ + @Override HeapMemoryManager getHeapMemoryManager(); /** diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index fa69d63..3b8757c 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -131,7 +131,7 @@ public class FSHLog implements WAL { // Calls to append now also wait until the append has been done on the consumer side of the // disruptor. We used to not wait but it makes the implemenation easier to grok if we have // the region edit/sequence id after the append returns. - // + // // TODO: Handlers need to coordinate appending AND syncing. Can we have the threads contend // once only? Probably hard given syncs take way longer than an append. // @@ -222,7 +222,7 @@ public class FSHLog implements WAL { private final String logFilePrefix; /** - * Suffix included on generated wal file names + * Suffix included on generated wal file names */ private final String logFileSuffix; @@ -245,7 +245,7 @@ public class FSHLog implements WAL { public void registerWALActionsListener(final WALActionsListener listener) { this.listeners.add(listener); } - + @Override public boolean unregisterWALActionsListener(final WALActionsListener listener) { return this.listeners.remove(listener); @@ -322,6 +322,8 @@ public class FSHLog implements WAL { // If > than this size, roll the log. private final long logrollsize; + private final boolean deleteWithoutArchiving; + /** * The total size of wal */ @@ -491,6 +493,9 @@ public class FSHLog implements WAL { registerWALActionsListener(i); } } + + deleteWithoutArchiving = conf.getBoolean("hbase.wal.delete.without.archiving", false); + this.coprocessorHost = new WALCoprocessorHost(this, conf); // Get size to roll log at. Roll at 95% of HDFS block size so we avoid crossing HDFS blocks @@ -606,7 +611,7 @@ public class FSHLog implements WAL { /** * Tell listeners about pre log roll. - * @throws IOException + * @throws IOException */ private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath) throws IOException { @@ -619,7 +624,7 @@ public class FSHLog implements WAL { /** * Tell listeners about post log roll. - * @throws IOException + * @throws IOException */ private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath) throws IOException { @@ -887,9 +892,17 @@ public class FSHLog implements WAL { i.preLogArchive(p, newPath); } } - LOG.info("Archiving " + p + " to " + newPath); - if (!FSUtils.renameAndSetModifyTime(this.fs, p, newPath)) { - throw new IOException("Unable to rename " + p + " to " + newPath); + + if (deleteWithoutArchiving) { + LOG.info("Deleting " + p); + if (!FSUtils.delete(fs, p, false)) { + throw new IOException("Unable to delete " + p); + } + } else { + LOG.info("Archiving " + p + " to " + newPath); + if (!FSUtils.renameAndSetModifyTime(this.fs, p, newPath)) { + throw new IOException("Unable to rename " + p + " to " + newPath); + } } // Tell our listeners that a log has been archived. if (!this.listeners.isEmpty()) { @@ -953,23 +966,7 @@ public class FSHLog implements WAL { final FileStatus[] files = getFiles(); if (null != files && 0 != files.length) { for (FileStatus file : files) { - Path p = getWALArchivePath(this.fullPathArchiveDir, file.getPath()); - // Tell our listeners that a log is going to be archived. - if (!this.listeners.isEmpty()) { - for (WALActionsListener i : this.listeners) { - i.preLogArchive(file.getPath(), p); - } - } - - if (!FSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) { - throw new IOException("Unable to rename " + file.getPath() + " to " + p); - } - // Tell our listeners that a log was archived. - if (!this.listeners.isEmpty()) { - for (WALActionsListener i : this.listeners) { - i.postLogArchive(file.getPath(), p); - } - } + archiveLogFile(file.getPath()); } LOG.debug("Moved " + files.length + " WAL file(s) to " + FSUtils.getPath(this.fullPathArchiveDir)); @@ -1035,12 +1032,12 @@ public class FSHLog implements WAL { // we use HLogKey here instead of WALKey directly to support legacy coprocessors. return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterIds, nonceGroup, nonce); } - + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION", justification="Will never be null") @Override public long append(final HTableDescriptor htd, final HRegionInfo hri, final WALKey key, - final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore, + final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore, final List memstoreCells) throws IOException { if (this.closed) throw new IOException("Cannot append; log is closed"); // Make a trace scope for the append. It is closed on other side of the ring buffer by the @@ -1083,9 +1080,9 @@ public class FSHLog implements WAL { private class SyncRunner extends HasThread { private volatile long sequence; private final BlockingQueue syncFutures; - + /** - * UPDATE! + * UPDATE! * @param syncs the batch of calls to sync that arrived as this thread was starting; when done, * we will put the result of the actual hdfs sync call as the result. * @param sequence The sequence number on the ring buffer when this thread was set running. @@ -1133,7 +1130,7 @@ public class FSHLog implements WAL { // This function releases one sync future only. return 1; } - + /** * Release all SyncFutures whose sequence is <= currentSequence. * @param currentSequence @@ -1172,6 +1169,7 @@ public class FSHLog implements WAL { return sequence; } + @Override public void run() { long currentSequence; while (!isInterrupted()) { @@ -1566,7 +1564,7 @@ public class FSHLog implements WAL { * 'safe point' while the orchestrating thread does some work that requires the first thread * paused: e.g. holding the WAL writer while its WAL is swapped out from under it by another * thread. - * + * *

Thread A signals Thread B to hold when it gets to a 'safe point'. Thread A wait until * Thread B gets there. When the 'safe point' has been attained, Thread B signals Thread A. * Thread B then holds at the 'safe point'. Thread A on notification that Thread B is paused, @@ -1574,7 +1572,7 @@ public class FSHLog implements WAL { * it flags B and then Thread A and Thread B continue along on their merry way. Pause and * signalling 'zigzags' between the two participating threads. We use two latches -- one the * inverse of the other -- pausing and signaling when states are achieved. - * + * *

To start up the drama, Thread A creates an instance of this class each time it would do * this zigzag dance and passes it to Thread B (these classes use Latches so it is one shot * only). Thread B notices the new instance (via reading a volatile reference or how ever) and it @@ -1596,7 +1594,7 @@ public class FSHLog implements WAL { * Latch to wait on. Will be released when we can proceed. */ private volatile CountDownLatch safePointReleasedLatch = new CountDownLatch(1); - + /** * For Thread A to call when it is ready to wait on the 'safe point' to be attained. * Thread A will be held in here until Thread B calls {@link #safePointAttained()} @@ -1605,7 +1603,7 @@ public class FSHLog implements WAL { * @throws InterruptedException * @throws ExecutionException * @return The passed syncFuture - * @throws FailedSyncBeforeLogCloseException + * @throws FailedSyncBeforeLogCloseException */ SyncFuture waitSafePoint(final SyncFuture syncFuture) throws InterruptedException, FailedSyncBeforeLogCloseException { @@ -1617,7 +1615,7 @@ public class FSHLog implements WAL { } return syncFuture; } - + /** * Called by Thread B when it attains the 'safe point'. In this method, Thread B signals * Thread A it can proceed. Thread B will be held in here until {@link #releaseSafePoint()} @@ -1812,8 +1810,8 @@ public class FSHLog implements WAL { // so region sequenceids will also be in order. regionSequenceId = entry.stampRegionSequenceId(); - // Edits are empty, there is nothing to append. Maybe empty when we are looking for a - // region sequence id only, a region edit/sequence id that is not associated with an actual + // Edits are empty, there is nothing to append. Maybe empty when we are looking for a + // region sequence id only, a region edit/sequence id that is not associated with an actual // edit. It has to go through all the rigmarole to be sure we have the right ordering. if (entry.getEdit().isEmpty()) { return; @@ -1922,7 +1920,7 @@ public class FSHLog implements WAL { System.exit(-1); } } - + /** * Find the 'getPipeline' on the passed os stream. * @return Method or null. @@ -1969,4 +1967,4 @@ public class FSHLog implements WAL { } return new DatanodeInfo[0]; } -} \ No newline at end of file +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java index cce37d7..18156cb 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java @@ -125,11 +125,13 @@ public class FSTableDescriptors implements TableDescriptors { this.metaTableDescritor = TableDescriptor.metaTableDescriptor(conf); } + @Override public void setCacheOn() throws IOException { this.cache.clear(); this.usecache = true; } + @Override public void setCacheOff() throws IOException { this.usecache = false; this.cache.clear(); @@ -175,6 +177,8 @@ public class FSTableDescriptors implements TableDescriptors { } catch (NullPointerException e) { LOG.debug("Exception during readTableDecriptor. Current table name = " + tablename, e); + } catch (TableInfoMissingException e) { + // ignore. This is regular operation } catch (IOException ioe) { LOG.debug("Exception during readTableDecriptor. Current table name = " + tablename, ioe); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALContainer.java hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALContainer.java new file mode 100644 index 0000000..19bed26 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALContainer.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.wal; + +import java.io.IOException; +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.region.RegionServices; +import org.apache.hadoop.hbase.regionserver.LogRoller; +import org.apache.hadoop.hbase.regionserver.RegionServerRunningException; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.ipc.RemoteException; + +import com.sun.org.apache.commons.logging.Log; +import com.sun.org.apache.commons.logging.LogFactory; + +/** + * A concrete implementation of WALServices. WALContainer hosts the WALFactory and LogRoller(s). + */ +@InterfaceAudience.Private +public class WALContainer implements WALServices { + private static final Log LOG = LogFactory.getLog(WALContainer.class); + + private static final byte[] UNSPECIFIED_REGION = new byte[]{}; + + private final Server server; + private final RegionServices regionServices; + private final UncaughtExceptionHandler uncaughtExceptionHandler; + + protected volatile WALFactory walFactory; + + // WAL roller. log is protected rather than private to avoid + // eclipse warning when accessed by inner classes + final LogRoller walRoller; + // Lazily initialized if this container hosts a meta table. + final AtomicReference metawalRoller = new AtomicReference(); + + public WALContainer(final Server server, final RegionServices regionServices, + UncaughtExceptionHandler uncaughtExceptionHandler) { + this.walRoller = new LogRoller(server, regionServices); + this.server = server; + this.regionServices = regionServices; + this.uncaughtExceptionHandler = uncaughtExceptionHandler; + } + + @Override + public WAL getWAL(HRegionInfo regionInfo) throws IOException { + WAL wal; + LogRoller roller = walRoller; + //_ROOT_ and hbase:meta regions have separate WAL. + if (regionInfo != null && regionInfo.isMetaTable() && + regionInfo.getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) { + roller = ensureMetaWALRoller(); + wal = walFactory.getMetaWAL(regionInfo.getEncodedNameAsBytes()); + } else if (regionInfo == null) { + wal = walFactory.getWAL(UNSPECIFIED_REGION); + } else { + wal = walFactory.getWAL(regionInfo.getEncodedNameAsBytes()); + } + roller.addWAL(wal); + return wal; + } + + public static class FSWALContext { + public FileSystem fs; + /** Directory for WALs for all servers*/ + public Path walDir; + /** Archive directory for WALs */ + public Path walArchiveDir; + /** Directory for WALs for this server*/ + public Path serverWalDir; + + public FSWALContext(FileSystem fs, Path walDir, Path walArchiveDir, Path serverWalDir) { + this.fs = fs; + this.walDir = walDir; + this.walArchiveDir = walArchiveDir; + this.serverWalDir = serverWalDir; + } + } + + public FSWALContext checkServerWALDir(FileSystem fs, Path rootDir, ServerName serverName) + throws IOException { + final Path walArchiveDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); + final Path walDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME); + final String serverWalDirName = DefaultWALProvider.getWALDirectoryName(serverName.toString()); + + Path serverWalDir = new Path(rootDir, serverWalDirName); + if (LOG.isDebugEnabled()) LOG.debug("logdir=" + serverWalDir); + if (fs.exists(serverWalDir)) { + throw new RegionServerRunningException("Region server has already " + + "created directory at " + serverName.toString()); + } + return new FSWALContext(fs, walDir, walArchiveDir, serverWalDir); + } + + /** + * Setup WAL + * @throws IOException + */ + public void setupWAL(Configuration conf, ServerName serverName, + List listeners) throws IOException { + this.walFactory = new WALFactory(conf, listeners, serverName.toString()); + } + + @Override + public WALFactory getWALFactory() { + return this.walFactory; + } + + @Override + public void requestRollAll() { + walRoller.requestRollAll(); + } + + public void startServiceThreads(String daemonName) { + Threads.setDaemonThreadRunning(this.walRoller.getThread(), daemonName + ".logRoller", + uncaughtExceptionHandler); + } + + public void stopServiceThreads() { + if (this.walRoller != null) { + Threads.shutdown(this.walRoller.getThread()); + } + final LogRoller metawalRoller = this.metawalRoller.get(); + if (metawalRoller != null) { + Threads.shutdown(metawalRoller.getThread()); + } + } + + public boolean isServiceThreadsAlive() { + if (!walRoller.isAlive()) { + return false; + } + final LogRoller metawalRoller = this.metawalRoller.get(); + if (metawalRoller != null && !metawalRoller.isAlive()) { + return false; + } + return true; + } + + /** + * We initialize the roller for the wal that handles meta lazily + * since we don't know if this regionserver will handle it. All calls to + * this method return a reference to the that same roller. As newly referenced + * meta regions are brought online, they will be offered to the roller for maintenance. + * As a part of that registration process, the roller will add itself as a + * listener on the wal. + */ + protected LogRoller ensureMetaWALRoller() { + // Using a tmp log roller to ensure metaLogRoller is alive once it is not + // null + LogRoller roller = metawalRoller.get(); + if (null == roller) { + LogRoller tmpLogRoller = new LogRoller(server, regionServices); + String n = Thread.currentThread().getName(); + Threads.setDaemonThreadRunning(tmpLogRoller.getThread(), + n + "-MetaLogRoller", uncaughtExceptionHandler); + if (metawalRoller.compareAndSet(null, tmpLogRoller)) { + roller = tmpLogRoller; + } else { + // Another thread won starting the roller + Threads.shutdown(tmpLogRoller.getThread()); + roller = metawalRoller.get(); + } + } + return roller; + } + + public void shutdownWAL(final boolean close) { + if (this.walFactory != null) { + try { + if (close) { + walFactory.close(); + } else { + walFactory.shutdown(); + } + } catch (Throwable e) { + e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; + LOG.error("Shutdown / close of WAL failed: " + e); + LOG.debug("Shutdown / close exception details:", e); + } + } + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALServices.java hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALServices.java new file mode 100644 index 0000000..a51cd51 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALServices.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.wal; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Services related to hosting and managing WAL + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) +@InterfaceStability.Evolving +public interface WALServices { + + /** + * Return the WALFactory used by this WALServices + * @return WALFactory used + */ + WALFactory getWALFactory(); + + /** + * Returns the WAL for a particular region. Pass null for getting the + * default (common) WAL. + * @return the WAL + */ + WAL getWAL(HRegionInfo regionInfo) throws IOException; + + /** + * Request a roll of all WALs. + */ + void requestRollAll(); + +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 9a26a24..5c2569b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -209,7 +209,6 @@ public class WALSplitter { this.distributedLogReplay = false; outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads); } - } /** @@ -280,6 +279,7 @@ public class WALSplitter { boolean progress_failed = false; int editsCount = 0; int editsSkipped = 0; + long start = System.nanoTime(); status = TaskMonitor.get().createStatus( @@ -399,10 +399,13 @@ public class WALSplitter { progress_failed = outputSink.finishWritingAndClose() == null; } } finally { + long end = System.nanoTime(); + long elapsedMs = TimeUnit.MILLISECONDS.convert(end - start, TimeUnit.NANOSECONDS); String msg = "Processed " + editsCount + " edits across " + outputSink.getNumberOfRecoveredRegions() + " regions; edits skipped=" + editsSkipped + "; log file=" + logPath + ", length=" + logfile.getLen() + // See if length got updated post lease recovery + ", in ms=" + elapsedMs + ", corrupted=" + isCorrupted + ", progress failed=" + progress_failed; LOG.info(msg); status.markComplete(msg); @@ -488,13 +491,21 @@ public class WALSplitter { } } + boolean deleteWithoutArchiving = conf.getBoolean("hbase.wal.delete.without.archiving", false); + for (Path p : processedLogs) { Path newPath = FSHLog.getWALArchivePath(oldLogDir, p); if (fs.exists(p)) { - if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) { - LOG.warn("Unable to move " + p + " to " + newPath); + if (deleteWithoutArchiving) { + if (!FSUtils.delete(fs, p, false)) { + LOG.warn("Unable to delete " + p); + } } else { - LOG.info("Archived processed log " + p + " to " + newPath); + if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) { + LOG.warn("Unable to move " + p + " to " + newPath); + } else { + LOG.info("Archived processed log " + p + " to " + newPath); + } } } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index e8b79a8..10df8c2 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -2289,6 +2289,18 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { return rowCount; } + public void loadNumericRowsMultiPut(final Table t, final byte[] f, int startRow, int endRow) + throws IOException { + ArrayList puts = new ArrayList<>(endRow - startRow); + for (int i = startRow; i < endRow; i++) { + byte[] data = Bytes.toBytes(String.valueOf(i)); + Put put = new Put(data); + put.add(f, null, data); + puts.add(put); + } + t.put(puts); + } + public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow) throws IOException { for (int i = startRow; i < endRow; i++) { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java index c126b19..f8a026f 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.ServerNonceManager; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -105,6 +107,11 @@ public class MockRegionServerServices implements RegionServerServices { } @Override + public Collection getOnlineRegions() { + return null; + } + + @Override public Set getOnlineTables() { return null; } @@ -306,4 +313,16 @@ public class MockRegionServerServices implements RegionServerServices { public double getCompactionPressure() { return 0; } + + @Override + public WALFactory getWALFactory() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void requestRollAll() { + // TODO Auto-generated method stub + + } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index eb8f803..2a0d353 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -105,6 +106,7 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.ServerNonceManager; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -544,6 +546,12 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { } @Override + public Collection getOnlineRegions() { + // TODO Auto-generated method stub + return null; + } + + @Override public Set getOnlineTables() { // TODO Auto-generated method stub return null; @@ -651,4 +659,16 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { public double getCompactionPressure() { return 0; } + + @Override + public WALFactory getWALFactory() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void requestRollAll() { + // TODO Auto-generated method stub + + } } \ No newline at end of file diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java index c6aaf67..4406048 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java @@ -501,7 +501,7 @@ public class TestPerColumnFamilyFlush { assertTrue(desiredRegion.getStore(FAMILY3).getMemStoreSize() < cfFlushSizeLowerBound); table.put(createPut(1, 12345678)); // Make numRolledLogFiles greater than maxLogs - desiredRegionAndServer.getSecond().walRoller.requestRollAll(); + desiredRegionAndServer.getSecond().requestRollAll(); // Wait for some time till the flush caused by log rolling happens. TEST_UTIL.waitFor(30000, new Waiter.ExplainingPredicate() {