diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 5c8ccd2..15d147c 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -189,9 +189,9 @@ import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager; import org.apache.hadoop.hbase.regionserver.wal.HLog; -import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.regionserver.wal.MultiWALGrouper; +import org.apache.hadoop.hbase.regionserver.wal.WALGrouperFactory; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.trace.SpanReceiverHost; import org.apache.hadoop.hbase.util.Bytes; @@ -391,16 +391,6 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa */ Chore periodicFlusher; - // HLog and HLog roller. log is protected rather than private to avoid - // eclipse warning when accessed by inner classes - protected volatile HLog hlog; - // The meta updates are written to a different hlog. If this - // regionserver holds meta regions, then this field will be non-null. - protected volatile HLog hlogForMeta; - - LogRoller hlogRoller; - LogRoller metaHLogRoller; - // flag set after we're done setting up server threads (used for testing) protected volatile boolean isOnline; @@ -480,6 +470,9 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa // Table level lock manager for locking for region operations private TableLockManager tableLockManager; + + // Log grouper + private MultiWALGrouper walGrouper; /** * Starts a HRegionServer at the default location @@ -888,8 +881,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa // TODO: Should we check they are alive? If OOME could have exited already if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary(); if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary(); - if (this.hlogRoller != null) this.hlogRoller.interruptIfNecessary(); - if (this.metaHLogRoller != null) this.metaHLogRoller.interruptIfNecessary(); + this.walGrouper.interruptRollers(); if (this.compactionChecker != null) this.compactionChecker.interrupt(); if (this.healthCheckChore != null) { @@ -1030,7 +1022,8 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa serverLoad.setTotalNumberOfRequests((int) regionServerWrapper.getTotalRequestCount()); serverLoad.setUsedHeapMB((int)(memory.getUsed() / 1024 / 1024)); serverLoad.setMaxHeapMB((int) (memory.getMax() / 1024 / 1024)); - Set coprocessors = this.hlog.getCoprocessorHost().getCoprocessors(); + //Set coprocessors = this.hlog.getCoprocessorHost().getCoprocessors(); + Set coprocessors = this.getCoprocessorHost().getCoprocessors(); for (String coprocessor : coprocessors) { serverLoad.addCoprocessors( Coprocessor.newBuilder().setName(coprocessor).build()); @@ -1106,7 +1099,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa } private void closeWAL(final boolean delete) { - if (this.hlogForMeta != null) { + /*if (this.hlogForMeta != null) { // All hlogs (meta and non-meta) are in the same directory. Don't call // closeAndDelete here since that would delete all hlogs not just the // meta ones. We will just 'close' the hlog for meta here, and leave @@ -1127,6 +1120,11 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa } catch (Throwable e) { LOG.error("Close and delete failed", RemoteExceptionHandler.checkThrowable(e)); } + }*/ + try { + this.walGrouper.closeAllWALs(delete); + } catch (Throwable e) { + LOG.error("Close and delete failed", RemoteExceptionHandler.checkThrowable(e)); } } @@ -1192,7 +1190,10 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa this.fs = new HFileSystem(this.conf, this.useHBaseChecksum); this.rootDir = FSUtils.getRootDir(this.conf); this.tableDescriptors = new FSTableDescriptors(this.fs, this.rootDir, true); - this.hlog = setupWALAndReplication(); + setUpWALReplication(); + this.walGrouper = WALGrouperFactory.getWALGrouper(conf); + this.walGrouper.init(conf, this, this, this.replicationSourceHandler, + this.serverNameFromMasterPOV, uncaughtExceptionHandler); // Init in here rather than in constructor after thread name has been set this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this)); @@ -1408,13 +1409,8 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa return isOnline; } - /** - * Setup WAL log and replication if enabled. - * Replication setup is done in here because it wants to be hooked up to WAL. - * @return A WAL instance. - * @throws IOException - */ - private HLog setupWALAndReplication() throws IOException { + // We need only the logDir name here + private void setUpWALReplication() throws IOException { final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); final String logName = HLogUtil.getHLogDirectoryName(this.serverNameFromMasterPOV.toString()); @@ -1429,68 +1425,8 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa // Instantiate replication manager if replication enabled. Pass it the // log directories. createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir); - - return instantiateHLog(rootDir, logName); } - private HLog getMetaWAL() throws IOException { - if (this.hlogForMeta != null) return this.hlogForMeta; - final String logName = HLogUtil.getHLogDirectoryName(this.serverNameFromMasterPOV.toString()); - Path logdir = new Path(rootDir, logName); - if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir); - this.hlogForMeta = HLogFactory.createMetaHLog(this.fs.getBackingFs(), rootDir, logName, - this.conf, getMetaWALActionListeners(), this.serverNameFromMasterPOV.toString()); - return this.hlogForMeta; - } - - /** - * Called by {@link #setupWALAndReplication()} creating WAL instance. - * @param rootdir - * @param logName - * @return WAL instance. - * @throws IOException - */ - protected HLog instantiateHLog(Path rootdir, String logName) throws IOException { - return HLogFactory.createHLog(this.fs.getBackingFs(), rootdir, logName, this.conf, - getWALActionListeners(), this.serverNameFromMasterPOV.toString()); - } - - /** - * Called by {@link #instantiateHLog(Path, String)} setting up WAL instance. - * Add any {@link WALActionsListener}s you want inserted before WAL startup. - * @return List of WALActionsListener that will be passed in to - * {@link org.apache.hadoop.hbase.regionserver.wal.FSHLog} on construction. - */ - protected List getWALActionListeners() { - List listeners = new ArrayList(); - // Log roller. - this.hlogRoller = new LogRoller(this, this); - listeners.add(this.hlogRoller); - if (this.replicationSourceHandler != null && - this.replicationSourceHandler.getWALActionsListener() != null) { - // Replication handler is an implementation of WALActionsListener. - listeners.add(this.replicationSourceHandler.getWALActionsListener()); - } - return listeners; - } - - protected List getMetaWALActionListeners() { - List listeners = new ArrayList(); - // Using a tmp log roller to ensure metaLogRoller is alive once it is not - // null - MetaLogRoller tmpLogRoller = new MetaLogRoller(this, this); - String n = Thread.currentThread().getName(); - Threads.setDaemonThreadRunning(tmpLogRoller.getThread(), - n + "-MetaLogRoller", uncaughtExceptionHandler); - this.metaHLogRoller = tmpLogRoller; - tmpLogRoller = null; - listeners.add(this.metaHLogRoller); - return listeners; - } - - protected LogRoller getLogRoller() { - return hlogRoller; - } public MetricsRegionServer getMetrics() { return this.metricsRegionServer; @@ -1531,9 +1467,8 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK, conf.getInt("hbase.storescanner.parallel.seek.threads", 10)); } - - Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller", - uncaughtExceptionHandler); + // No use in starting here + this.walGrouper.startLogRollers(); this.cacheFlusher.start(uncaughtExceptionHandler); Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), n + ".compactionChecker", uncaughtExceptionHandler); @@ -1627,16 +1562,12 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa } // Verify that all threads are alive if (!(leases.isAlive() - && cacheFlusher.isAlive() && hlogRoller.isAlive() + && cacheFlusher.isAlive() && this.walGrouper.isRollerAlive() && this.compactionChecker.isAlive()) && this.periodicFlusher.isAlive()) { stop("One or more threads are no longer alive -- stop"); return false; } - if (metaHLogRoller != null && !metaHLogRoller.isAlive()) { - stop("Meta HLog roller thread is no longer alive -- stop"); - return false; - } return true; } @@ -1651,14 +1582,11 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa @Override public HLog getWAL(HRegionInfo regionInfo) throws IOException { + return this.walGrouper.getWAL(regionInfo); //TODO: at some point this should delegate to the HLogFactory //currently, we don't care about the region as much as we care about the //table.. (hence checking the tablename below) //_ROOT_ and hbase:meta regions have separate WAL. - if (regionInfo != null && regionInfo.isMetaTable()) { - return getMetaWAL(); - } - return this.hlog; } @Override @@ -1804,12 +1732,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa if (this.spanReceiverHost != null) { this.spanReceiverHost.closeReceivers(); } - if (this.hlogRoller != null) { - Threads.shutdown(this.hlogRoller.getThread()); - } - if (this.metaHLogRoller != null) { - Threads.shutdown(this.metaHLogRoller.getThread()); - } + this.walGrouper.shutDownLogRollers(); if (this.compactSplitThread != null) { this.compactSplitThread.join(); } @@ -2347,7 +2270,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070). public String[] getCoprocessors() { TreeSet coprocessors = new TreeSet( - this.hlog.getCoprocessorHost().getCoprocessors()); + this.getCoprocessorHost().getCoprocessors()); Collection regions = getOnlineRegionsLocalContext(); for (HRegion region: regions) { coprocessors.addAll(region.getCoprocessorHost().getCoprocessors()); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index 9488749..f7e44d6 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -43,7 +43,7 @@ import java.util.concurrent.locks.ReentrantLock; * sleep time which is invariant. */ @InterfaceAudience.Private -class LogRoller extends HasThread implements WALActionsListener { +public class LogRoller extends HasThread implements WALActionsListener { static final Log LOG = LogFactory.getLog(LogRoller.class); private final ReentrantLock rollLock = new ReentrantLock(); private final AtomicBoolean rollLog = new AtomicBoolean(false); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index a9fa5cc..d1d504b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -44,6 +44,7 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi /** @return the HLog for a particular region. Pass null for getting the * default (common) WAL */ + // TODO : We could change this - need not pass null HLog getWAL(HRegionInfo regionInfo) throws IOException; /** diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java index 4c86544..008458a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java @@ -466,7 +466,7 @@ public class OpenRegionHandler extends EventHandler { /** * @return Instance of HRegion if successful open else null. */ - HRegion openRegion() { + public HRegion openRegion() { HRegion region = null; try { // Instantiate the region. This also periodically tickles our zk OPENING diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DefaultGrouper.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DefaultGrouper.java new file mode 100644 index 0000000..b49a6d1 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DefaultGrouper.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.IOException; +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.regionserver.LogRoller; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.ReplicationSourceService; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Threads; + +public class DefaultGrouper implements MultiWALGrouper { + + static final Log LOG = LogFactory.getLog(DefaultGrouper.class); + RegionServerServices rsServices; + Server server; + static Configuration conf; + protected FileSystem fs; + protected Path rootDir; + protected volatile HLog hlogForMeta; + protected volatile HLog hLogForRS; + ReplicationSourceService replicationSourceHandler; + LogListCollector logListCollector; + List rollers; + UncaughtExceptionHandler uncaughtExceptionHandler; + ServerName serverNameFromMasterPOV; + AtomicBoolean boolForMeta = new AtomicBoolean(false); + AtomicBoolean boolForWAL = new AtomicBoolean(false); + + public DefaultGrouper() { + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public void init(Configuration conf, RegionServerServices rsServices, Server server, + ReplicationSourceService replicationSourceHandler, ServerName serverNameFromMasterPOV, + UncaughtExceptionHandler uncaughtExceptionHandler) throws IOException { + this.conf = conf; + this.rsServices = rsServices; + FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf)); + // Get fs instance used by this RS. For now pass true + this.rootDir = FSUtils.getRootDir(this.conf); + this.fs = this.rootDir.getFileSystem(this.conf); + this.server = server; + this.replicationSourceHandler = replicationSourceHandler; + this.uncaughtExceptionHandler = uncaughtExceptionHandler; + this.serverNameFromMasterPOV = serverNameFromMasterPOV; + logListCollector = new DefaultLogListCollector(); + } + + @Override + // Check if the synchronization using AtomicBoolean is sufficient + public HLog getWAL(HRegionInfo regionInfo) throws IOException { + if (regionInfo != null && regionInfo.isMetaTable()) { + if (boolForMeta.get() == false) { + synchronized (boolForMeta) { + boolean initialized = boolForMeta.get(); + if (!initialized) { + hlogForMeta = createNewHLog(true, logListCollector); + boolForMeta.compareAndSet(false, true); + } + } + } + return hlogForMeta; + } else { + if (boolForWAL.get() == false) { + synchronized (boolForWAL) { + boolean initialized = boolForWAL.get(); + if (!initialized) { + hLogForRS = createNewHLog(false, logListCollector); + boolForWAL.compareAndSet(false, true); + } + } + } + return hLogForRS; + } + } + + /** + * Always instantiates a new HLog and also setups the replication + * + * @return + * @throws IOException + */ + protected HLog createNewHLog(boolean forMeta, LogListCollector listCollector) throws IOException { + final String logName = HLogUtil.getHLogDirectoryName(rsServices.getServerName().toString()); + HLog hlog = HLogFactory.createHLog(fs, this.rootDir, logName, conf, + getWALActionListeners(forMeta), this.serverNameFromMasterPOV.toString(), forMeta); + listCollector.add(hlog, forMeta); + return hlog; + } + + /** + * Called by {@link #instantiateHLog(Path, String)} setting up WAL instance. + * Add any {@link WALActionsListener}s you want inserted before WAL startup. + * + * @return List of WALActionsListener that will be passed in to + * {@link org.apache.hadoop.hbase.regionserver.wal.FSHLog} on + * construction. + */ + @Override + public List getWALActionListeners(boolean forMeta) { + List listeners = new ArrayList(); + rollers = new ArrayList(); + // Log roller. + LogRoller roller = new LogRoller(server, rsServices); + startLogRoller(roller); + rollers.add(roller); + listeners.add(roller); + if (!forMeta) { + if (this.replicationSourceHandler != null + && this.replicationSourceHandler.getWALActionsListener() != null) { + // Replication handler is an implementation of WALActionsListener. + listeners.add(this.replicationSourceHandler.getWALActionsListener()); + } + } + return listeners; + } + + @Override + public List getLogRollers() { + return rollers; + } + + @Override + public boolean isRollerAlive() { + if (rollers != null) { + for (LogRoller roller : rollers) { + if (roller != null) { + if (roller.isAlive()) + return true; + } + } + } else { + return true; + } + return false; + } + + @Override + public void startLogRollers() { + if (rollers != null) { + for (LogRoller roller : rollers) { + if (roller != null) { + Threads.setDaemonThreadRunning(roller.getThread(), Thread.currentThread().getName() + + ".logRoller", uncaughtExceptionHandler); + } + } + } + } + + @Override + public void startLogRoller(LogRoller roller) { + Threads.setDaemonThreadRunning(roller.getThread(), Thread.currentThread().getName() + + ".logRoller", uncaughtExceptionHandler); + } + + @Override + public void shutDownLogRollers() { + if (rollers != null) { + for (LogRoller roller : rollers) { + if (roller != null) { + Threads.shutdown(roller.getThread()); + } + } + } + } + + @Override + public void interruptRollers() { + if (rollers != null) { + for (LogRoller roller : rollers) { + if (roller != null) { + roller.interruptIfNecessary(); + } + } + } + + } + + @Override + public void closeAllWALs(boolean delete) throws IOException { + CopyOnWriteArrayList hlogList = getLogListCollector().getHLogList(); + for (HLogHolder hlogHolder : hlogList) { + if (hlogHolder.getHLog() != null) { + if (!hlogHolder.isMetaHLog() && delete) { + hlogHolder.getHLog().closeAndDelete(); + } else { + hlogHolder.getHLog().close(); + } + } + } + } + + protected LogListCollector getLogListCollector() { + return this.logListCollector; + } + + /** + * A simple holder class that holds the HLog along with the boolean that says + * whether the HLog is for META. This would be used while closing the META + * HLog on RS close + * + */ + public static class HLogHolder { + private HLog hlog; + private boolean metaHLog; + + public HLogHolder(HLog hlog, boolean metaHLog) { + this.hlog = hlog; + this.metaHLog = metaHLog; + } + + public boolean isMetaHLog() { + return metaHLog; + } + + public HLog getHLog() { + return hlog; + } + } + + /** + * An interface that allows to collect the logs that were created This would + * be helpful where we could use this list to issue close of WALs + */ + public static interface LogListCollector { + // Adds the HLog to the List + public void add(HLog hlog, boolean forMeta); + + // Returns the HLogHolder list + public CopyOnWriteArrayList getHLogList(); + } + + static class DefaultLogListCollector implements LogListCollector { + private CopyOnWriteArrayList hlogList = new CopyOnWriteArrayList(); + + public void add(HLog hlog, boolean forMeta) { + hlogList.add(new HLogHolder(hlog, forMeta)); + } + + @Override + public CopyOnWriteArrayList getHLogList() { + return hlogList; + } + } + + @Override + public Configuration getConf() { + return this.conf; + } + + @Override + public WalGroupingStatus getWalGroupingStatus() { + // TODO Auto-generated method stub + return null; + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index b29bb71..2dc4744 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -27,7 +27,6 @@ import java.net.URLEncoder; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -50,12 +49,12 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Syncable; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.DrainBarrier; @@ -288,6 +287,59 @@ class FSHLog implements HLog, Syncable { this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, true, prefix, false); } + + + /** + * Create an edit log at the given dir location. + * + * You should never have to load an existing log. If there is a log at + * startup, it should have already been processed and deleted by the time the + * HLog object is started up. + * + * @param fs filesystem handle + * @param root path for stored and archived hlogs + * @param logDir dir where hlogs are stored + * @param conf configuration to use + * @param listeners Listeners on WAL events. Listeners passed here will + * be registered before we do anything else; e.g. the + * Constructor {@link #rollWriter()}. + * @param prefix should always be hostname and port in distributed env and + * it will be URL encoded before being used. + * If prefix is null, "hlog" will be used + * @throws IOException + */ + public FSHLog(final FileSystem fs, final Path root, final String logDir, + final Configuration conf, final List listeners, + final String prefix, boolean forMeta) throws IOException { + this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, + conf, listeners, true, prefix, forMeta); + } + /** + * Create an edit log at the given dir location. + * + * You should never have to load an existing log. If there is a log at + * startup, it should have already been processed and deleted by the time the + * HLog object is started up. + * + * @param fs filesystem handle + * @param root path for stored and archived hlogs + * @param logDir dir where hlogs are stored + * @param conf configuration to use + * @param listeners Listeners on WAL events. Listeners passed here will + * be registered before we do anything else; e.g. the + * Constructor {@link #rollWriter()}. + * @param prefix should always be hostname and port in distributed env and + * it will be URL encoded before being used. + * If prefix is null, "hlog" will be used + * @throws IOException + */ + public FSHLog(final FileSystem fs, final Path root, final String logDir, + final Configuration conf, final List listeners, final boolean failIfDirExists, + final String prefix, boolean forMeta) throws IOException { + this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, + conf, listeners, failIfDirExists, prefix, forMeta); + } + /** * Create an edit log at the given dir location. diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java index fb97f22..3d7da01 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java @@ -21,8 +21,8 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.IOException; -import java.util.Arrays; import java.io.InterruptedIOException; +import java.util.Arrays; import java.util.List; import org.apache.commons.logging.Log; @@ -55,6 +55,13 @@ public class HLogFactory { final String prefix) throws IOException { return new FSHLog(fs, root, logName, conf, listeners, prefix); } + + public static HLog createHLog(final FileSystem fs, final Path root, final String logName, + final Configuration conf, final List listeners, + final String prefix, final boolean forMeta) throws IOException { + return new FSHLog(fs, root, logName, conf, listeners, false, prefix, forMeta); + } + public static HLog createMetaHLog(final FileSystem fs, final Path root, final String logName, final Configuration conf, final List listeners, diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MultiWALGrouper.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MultiWALGrouper.java new file mode 100644 index 0000000..f59e254 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MultiWALGrouper.java @@ -0,0 +1,110 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.IOException; +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.List; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.regionserver.LogRoller; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.ReplicationSourceService; +/** + * A generic interface that provides facility for adding multiWAL facility per region server + * Default implementation would be - One WAL per RS for all regions and one wal for META + * Other implementations could be - one WAL per table, WAL grouped based on regions + * + */ +public interface MultiWALGrouper extends Configurable { + /** + * Initialize the MultiWALGrouper in the RS start up + * @param conf + * @param rsServices + * @param server + * @param replicationSourceHandler + * @param serverNameFromMasterPOV + * @param unCaughtExceptionHandler + * @throws IOException + */ + public void init(Configuration conf, RegionServerServices rsServices, Server server, + ReplicationSourceService replicationSourceHandler, ServerName serverNameFromMasterPOV, + UncaughtExceptionHandler unCaughtExceptionHandler) + throws IOException; + + /** + * Creates an HLog for the givenn region info. The WAL creation is configurable + * @param info + * @return + * @throws IOException + */ + public HLog getWAL(HRegionInfo info) throws IOException; + + /** + * Closes all the WALs created + * @param delete + * @throws IOException + */ + public void closeAllWALs(boolean delete) throws IOException; + // Every HLog created here should have a logRoller assocaited with it + /** + * Gets all the log rollers + * @return + */ + public List getLogRollers(); + + /** + * This would be needed for having logroller as listener for every log that is created + */ + public List getWALActionListeners(boolean forMeta); + + /** + * Check if any of the roller is alive + * @return + */ + public boolean isRollerAlive(); + + /** + * Start the log rollers + */ + public void startLogRollers(); + + /** + * Interrupt the logs rollers + */ + public void interruptRollers(); + + /** + * Starts the log roller associated with the logs created + * @param roller + */ + public void startLogRoller(LogRoller roller); + + /** + * Shuts down the log rollers associated with the logs created + */ + public void shutDownLogRollers(); + + //TODO : Will this be needed? + public WalGroupingStatus getWalGroupingStatus(); +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RoundRobinRegionBasedGrouper.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RoundRobinRegionBasedGrouper.java new file mode 100644 index 0000000..811b16b --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RoundRobinRegionBasedGrouper.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HRegionInfo; + +/** + * Creates hlogs in roundrobin fashion. For META there would be a seperate HLog + * file and for others there would be Hlog files as configured in the + * configuration 'hlog.region.grouping.count' and the regions would be assigned + * these Hlogs in roundrobin fashion + * + */ +public abstract class RoundRobinRegionBasedGrouper extends DefaultGrouper { + + @Override + public HLog getWAL(HRegionInfo regionInfo) throws IOException { + return null; + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/TableBasedGrouper.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/TableBasedGrouper.java new file mode 100644 index 0000000..e78e62b --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/TableBasedGrouper.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; + +/** + * Provides the implementation for table level grouping, which means that every + * new table will write to seperate logs Note : META will also be written to + * seperate HLog + */ +public class TableBasedGrouper extends DefaultGrouper { + + private ConcurrentHashMap tableVsWAL = new ConcurrentHashMap(); + + public TableBasedGrouper() { + } + + @Override + public HLog getWAL(HRegionInfo regionInfo) throws IOException { + TableName tableName = regionInfo.getTableName(); + HLog hlog; + // Synchronize per table + synchronized (tableName) { + hlog = tableVsWAL.get(tableName); + if (hlog == null) { + if (regionInfo.isMetaTable()) { + hlog = createNewHLog(true, getLogListCollector()); + } else { + hlog = createNewHLog(false, getLogListCollector()); + } + tableVsWAL.put(tableName, hlog); + } + } + return hlog; + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALGrouperFactory.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALGrouperFactory.java new file mode 100644 index 0000000..45cb0b5 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALGrouperFactory.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; + +public class WALGrouperFactory { + + public static MultiWALGrouper getWALGrouper(Configuration conf) { + // Create the balancer + Class walGrouperKlass = + conf.getClass("hbase.regionserver.walgrouper.class", DefaultGrouper.class, + MultiWALGrouper.class); + return ReflectionUtils.newInstance(walGrouperKlass, conf); + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WalGroupingStatus.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WalGroupingStatus.java new file mode 100644 index 0000000..9d14b50 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WalGroupingStatus.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.jboss.netty.util.internal.ConcurrentHashMap; + +public class WalGroupingStatus { + + private Map regionVsHLog = new ConcurrentHashMap(); + private Map> hLogVsRegionInfo = new ConcurrentHashMap>(); + + public Map getRegionVsHLogMapping() { + return regionVsHLog; + } + + public Map> getHLogVsRegionMapping() { + return hLogVsRegionInfo; + } +} \ No newline at end of file diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 505c572..6804b6e 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -2114,7 +2114,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { rss.setFileSystem(getTestFileSystem()); return rss; } - + /** * Switches the logger for the given class to DEBUG level. * diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java index b2c434c..bba6835 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.regionserver.Leases; import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.MultiWALGrouper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -46,7 +47,7 @@ import org.apache.zookeeper.KeeperException; /** * Basic mock region server services. Should only be instantiated by HBaseTestingUtility.b */ -class MockRegionServerServices implements RegionServerServices { +public class MockRegionServerServices implements RegionServerServices { private final Map regions = new HashMap(); private boolean stopping = false; private final ConcurrentSkipListMap rit = @@ -56,6 +57,7 @@ class MockRegionServerServices implements RegionServerServices { private ServerName serverName = null; private RpcServerInterface rpcServer = null; private volatile boolean abortRequested; + private MultiWALGrouper walGrouper; MockRegionServerServices(ZooKeeperWatcher zkw) { this.zkw = zkw; @@ -65,6 +67,12 @@ class MockRegionServerServices implements RegionServerServices { this.zkw = zkw; this.serverName = serverName; } + + MockRegionServerServices(ZooKeeperWatcher zkw, ServerName serverName, MultiWALGrouper walGrouper) { + this.zkw = zkw; + this.serverName = serverName; + this.walGrouper = walGrouper; + } MockRegionServerServices(){ this(null); @@ -182,6 +190,10 @@ class MockRegionServerServices implements RegionServerServices { public void setFileSystem(FileSystem hfs) { this.hfs = (HFileSystem)hfs; } + + public void setWALGrouper(MultiWALGrouper walGrouper) { + this.walGrouper = walGrouper; + } @Override public Leases getLeases() { @@ -190,7 +202,9 @@ class MockRegionServerServices implements RegionServerServices { @Override public HLog getWAL(HRegionInfo regionInfo) throws IOException { - // TODO Auto-generated method stub + if(walGrouper != null) { + return walGrouper.getWAL(regionInfo); + } return null; } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java index 7c7cf5a..85dd87e 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java @@ -97,7 +97,7 @@ public class TestOpenRegionHandler { assertNotNull(region); try { OpenRegionHandler handler = new OpenRegionHandler(server, rss, hri, htd) { - HRegion openRegion() { + public HRegion openRegion() { // Open region first, then remove znode as though it'd been hijacked. HRegion region = super.openRegion(); @@ -197,7 +197,7 @@ public class TestOpenRegionHandler { OpenRegionHandler handler = new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD) { @Override - HRegion openRegion() { + public HRegion openRegion() { // Fake failure of opening a region due to an IOE, which is caught return null; }