From 17f7f3f38ab429651729eb9dc6ac5aac3ecea087 Mon Sep 17 00:00:00 2001 From: Sean Busbey Date: Sun, 26 Oct 2014 23:53:45 -0500 Subject: [PATCH] HBASE-12339 WALPerformanceEvaluation should have a log roller. --- .../hadoop/hbase/regionserver/LogRoller.java | 5 +++- .../hadoop/hbase/MockRegionServerServices.java | 30 ++++++++++++++------ .../hadoop/hbase/wal/WALPerformanceEvaluation.java | 20 +++++++++---- 3 files changed, 41 insertions(+), 14 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index aa60bfb..71dea3b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -36,6 +36,8 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.ipc.RemoteException; +import com.google.common.annotations.VisibleForTesting; + /** * Runs periodically to determine if the WAL should be rolled. * @@ -46,7 +48,8 @@ import org.apache.hadoop.ipc.RemoteException; * TODO: change to a pool of threads */ @InterfaceAudience.Private -class LogRoller extends HasThread { +@VisibleForTesting +public class LogRoller extends HasThread { static final Log LOG = LogFactory.getLog(LogRoller.class); private final ReentrantLock rollLock = new ReentrantLock(); private final AtomicBoolean rollLog = new AtomicBoolean(false); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java index 11f3a7a..e7111e2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java @@ -24,7 +24,10 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.client.ClusterConnection; @@ -53,28 +56,36 @@ import com.google.protobuf.Service; /** * Basic mock region server services. Should only be instantiated by HBaseTestingUtility.b */ -class MockRegionServerServices implements RegionServerServices { +public class MockRegionServerServices implements RegionServerServices { + protected static final Log LOG = LogFactory.getLog(MockRegionServerServices.class); private final Map regions = new HashMap(); - private boolean stopping = false; private final ConcurrentSkipListMap rit = new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); private HFileSystem hfs = null; + private final Configuration conf; private ZooKeeperWatcher zkw = null; private ServerName serverName = null; private RpcServerInterface rpcServer = null; private volatile boolean abortRequested; + private volatile boolean stopping = false; + private final AtomicBoolean running = new AtomicBoolean(true); MockRegionServerServices(ZooKeeperWatcher zkw) { - this.zkw = zkw; + this(zkw, null); } MockRegionServerServices(ZooKeeperWatcher zkw, ServerName serverName) { this.zkw = zkw; this.serverName = serverName; + this.conf = (zkw == null ? new Configuration() : zkw.getConfiguration()); } MockRegionServerServices(){ - this(null); + this(null, null); + } + + public MockRegionServerServices(Configuration conf) { + this.conf = conf; } @Override @@ -179,7 +190,7 @@ class MockRegionServerServices implements RegionServerServices { @Override public Configuration getConfiguration() { - return zkw == null ? null : zkw.getConfiguration(); + return conf; } @Override @@ -190,12 +201,15 @@ class MockRegionServerServices implements RegionServerServices { @Override public void stop(String why) { - //no-op + this.stopping = true; + if (running.compareAndSet(true, false)) { + LOG.info("Shutting down due to request '" + why + "'"); + } } @Override public boolean isStopped() { - return false; + return !(running.get()); } @Override @@ -266,4 +280,4 @@ class MockRegionServerServices implements RegionServerServices { // TODO Auto-generated method stub return false; } -} \ No newline at end of file +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java index bfc7147..bf8fba0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java @@ -44,15 +44,18 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MockRegionServerServices; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.LogRoller; import org.apache.hadoop.hbase.trace.SpanReceiverHost; import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.htrace.Sampler; @@ -313,13 +316,16 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { final WALFactory wals = new WALFactory(getConf(), null, "wals"); final HRegion[] regions = new HRegion[numRegions]; final Runnable[] benchmarks = new Runnable[numRegions]; + final MockRegionServerServices mockServices = new MockRegionServerServices(getConf()); + final LogRoller roller = new LogRoller(mockServices, mockServices); + Threads.setDaemonThreadRunning(roller.getThread(), "WALPerfEval.logRoller"); try { for(int i = 0; i < numRegions; i++) { // Initialize Table Descriptor // a table per desired region means we can avoid carving up the key space final HTableDescriptor htd = createHTableDescriptor(i, numFamilies); - regions[i] = openRegion(fs, rootRegionDir, htd, wals, roll); + regions[i] = openRegion(fs, rootRegionDir, htd, wals, roll, roller); benchmarks[i] = Trace.wrap(new WALPutBenchmark(regions[i], htd, numIterations, noSync, syncInterval, traceFreq)); } @@ -335,6 +341,7 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { } } if (verify) { + LOG.info("verifying written log entries."); Path dir = new Path(FSUtils.getRootDir(getConf()), DefaultWALProvider.getWALDirectoryName("wals")); long editCount = 0; @@ -351,11 +358,16 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { } } } finally { + mockServices.stop("test clean up."); for (int i = 0; i < numRegions; i++) { if (regions[i] != null) { closeRegion(regions[i]); } } + if (null != roller) { + LOG.info("shutting down log roller."); + Threads.shutdown(roller.getThread()); + } wals.shutdown(); // Remove the root dir for this test region if (cleanup) cleanRegionRootDir(fs, rootRegionDir); @@ -465,13 +477,14 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { private final Set walsListenedTo = new HashSet(); private HRegion openRegion(final FileSystem fs, final Path dir, final HTableDescriptor htd, - final WALFactory wals, final long whenToRoll) throws IOException { + final WALFactory wals, final long whenToRoll, final LogRoller roller) throws IOException { // Initialize HRegion HRegionInfo regionInfo = new HRegionInfo(htd.getTableName()); // Initialize WAL final WAL wal = wals.getWAL(regionInfo.getEncodedNameAsBytes()); // If we haven't already, attach a listener to this wal to handle rolls and metrics. if (walsListenedTo.add(wal)) { + roller.addWAL(wal); wal.registerWALActionsListener(new WALActionsListener.Base() { private int appends = 0; @@ -484,8 +497,6 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { // We used to do explicit call to rollWriter but changed it to a request // to avoid dead lock (there are less threads going on in this class than // in the regionserver -- regionserver does not have the issue). - // TODO I think this means no rolling actually happens; the request relies on there - // being a LogRoller. DefaultWALProvider.requestLogRoll(wal); } } @@ -502,7 +513,6 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { appendMeter.mark(size); } }); - wal.rollWriter(); } return HRegion.createHRegion(regionInfo, dir, getConf(), htd, wal); -- 1.7.10.2 (Apple Git-33)