Index: src/contrib/hbase/src/test/hbase-site.xml
===================================================================
--- src/contrib/hbase/src/test/hbase-site.xml (revision 580433)
+++ src/contrib/hbase/src/test/hbase-site.xml (working copy)
@@ -75,4 +75,8 @@
the master will notice a dead region server sooner. The default is 15 seconds.
+
+ hbase.rootdir
+ /hbase
+ location of HBase instance in dfs
Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
===================================================================
--- src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (revision 580433)
+++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (working copy)
@@ -33,14 +33,14 @@
/**
* This class creates a single process HBase cluster for junit testing.
* One thread is created for each server.
- *
+ *
*
TestCases do not need to subclass to start a HBaseCluster. Call
* {@link #startMaster(Configuration)} and
* {@link #startRegionServers(Configuration, int)} to startup master and
* region servers. Save off the returned values and pass them to
* {@link #shutdown(org.apache.hadoop.hbase.MiniHBaseCluster.MasterThread, List)}
* to shut it all down when done.
- *
+ *
*/
public class MiniHBaseCluster implements HConstants {
static final Logger LOG =
@@ -48,6 +48,7 @@
private Configuration conf;
private MiniDFSCluster cluster;
private FileSystem fs;
+ private boolean shutdownDFS;
private Path parentdir;
private MasterThread masterThread = null;
ArrayList regionThreads =
@@ -56,21 +57,21 @@
/**
* Starts a MiniHBaseCluster on top of a new MiniDFSCluster
- *
+ *
* @param conf
* @param nRegionNodes
- * @throws IOException
+ * @throws IOException
*/
public MiniHBaseCluster(Configuration conf, int nRegionNodes)
throws IOException {
-
+
this(conf, nRegionNodes, true, true, true);
}
/**
* Start a MiniHBaseCluster. Use the native file system unless
* miniHdfsFilesystem is set to true.
- *
+ *
* @param conf
* @param nRegionNodes
* @param miniHdfsFilesystem
@@ -83,14 +84,20 @@
/**
* Starts a MiniHBaseCluster on top of an existing HDFSCluster
- *
- * Note that if you use this constructor, you should shut down the mini dfs
- * cluster in your test case.
- *
+ *
+ ****************************************************************************
+ * * * * * * N O T E * * * * *
+ *
+ * If you use this constructor, you should shut down the mini dfs cluster
+ * in your test case.
+ *
+ * * * * * * N O T E * * * * *
+ ****************************************************************************
+ *
* @param conf
* @param nRegionNodes
* @param dfsCluster
- * @throws IOException
+ * @throws IOException
*/
public MiniHBaseCluster(Configuration conf, int nRegionNodes,
MiniDFSCluster dfsCluster) throws IOException {
@@ -98,6 +105,7 @@
this.conf = conf;
this.fs = dfsCluster.getFileSystem();
this.cluster = dfsCluster;
+ this.shutdownDFS = false;
init(nRegionNodes);
}
@@ -110,17 +118,19 @@
* filesystem configured in conf.
* @param format the mini hdfs cluster
* @param deleteOnExit clean up mini hdfs files
- * @throws IOException
+ * @throws IOException
*/
public MiniHBaseCluster(Configuration conf, int nRegionNodes,
- final boolean miniHdfsFilesystem, boolean format, boolean deleteOnExit)
+ final boolean miniHdfsFilesystem, boolean format, boolean deleteOnExit)
throws IOException {
-
+
this.conf = conf;
this.deleteOnExit = deleteOnExit;
+ this.shutdownDFS = false;
if (miniHdfsFilesystem) {
this.cluster = new MiniDFSCluster(this.conf, 2, format, (String[])null);
this.fs = cluster.getFileSystem();
+ this.shutdownDFS = true;
} else {
this.cluster = null;
this.fs = FileSystem.get(conf);
@@ -139,7 +149,7 @@
throw e;
}
}
-
+
/** runs the master server */
public static class MasterThread extends Thread {
private final HMaster master;
@@ -147,20 +157,20 @@
super(m, "Master:" + m.getMasterAddress().toString());
this.master = m;
}
-
+
/** {@inheritDoc} */
@Override
public void run() {
LOG.info("Starting " + getName());
super.run();
}
-
+
/** @return master server */
public HMaster getMaster() {
return this.master;
}
}
-
+
/** runs region servers */
public static class RegionServerThread extends Thread {
private final HRegionServer regionServer;
@@ -168,20 +178,20 @@
super(r, "RegionServer:" + index);
this.regionServer = r;
}
-
+
/** {@inheritDoc} */
@Override
public void run() {
LOG.info("Starting " + getName());
super.run();
}
-
+
/** @return the region server */
public HRegionServer getRegionServer() {
return this.regionServer;
}
}
-
+
/**
* Use this method to start a master.
* If you want to start an hbase cluster
@@ -197,7 +207,7 @@
*/
public static MasterThread startMaster(final Configuration c)
throws IOException {
-
+
if(c.get(MASTER_ADDRESS) == null) {
c.set(MASTER_ADDRESS, "localhost:0");
}
@@ -221,7 +231,7 @@
*/
public static ArrayList startRegionServers(
final Configuration c, final int count) throws IOException {
-
+
// Start the HRegionServers. Always have regionservers come up on
// port '0' so there won't be clashes over default port as unit tests
// start/stop ports at different times during the life of the test.
@@ -234,10 +244,10 @@
}
return threads;
}
-
+
/**
* Starts a region server thread running
- *
+ *
* @throws IOException
* @return Name of regionserver started.
*/
@@ -247,10 +257,10 @@
this.regionThreads.add(t);
return t.getName();
}
-
+
private static RegionServerThread startRegionServer(final Configuration c,
final int index)
- throws IOException {
+ throws IOException {
final HRegionServer hrs = new HRegionServer(c);
RegionServerThread t = new RegionServerThread(hrs, index);
t.setName("regionserver" +
@@ -261,14 +271,14 @@
/**
* Get the cluster on which this HBase cluster is running
- *
+ *
* @return MiniDFSCluster
*/
public MiniDFSCluster getDFSCluster() {
return cluster;
}
- /**
+ /**
* @return Returns the rpc address actually used by the master server, because
* the supplied port is not necessarily the actual port used.
*/
@@ -278,7 +288,7 @@
/**
* Cause a region server to exit without cleaning up
- *
+ *
* @param serverNumber
*/
public void abortRegionServer(int serverNumber) {
@@ -290,7 +300,7 @@
/**
* Shut down the specified region server cleanly
- *
+ *
* @param serverNumber
* @return the region server that was stopped
*/
@@ -320,7 +330,7 @@
}
return regionServerThread.getName();
}
-
+
/**
* Wait for Mini HBase Cluster to shut down.
*/
@@ -346,7 +356,7 @@
}
}
}
-
+
/**
* Shut down HBase cluster started by calling
* {@link #startMaster(Configuration)} and then
@@ -389,14 +399,17 @@
((masterThread != null)? masterThread.getName(): "0 masters") + " " +
regionServerThreads.size() + " region server(s)");
}
-
+
+ /**
+ * Shut down the mini HBase cluster
+ */
public void shutdown() {
MiniHBaseCluster.shutdown(this.masterThread, this.regionThreads);
-
+
try {
- if (cluster != null) {
+ if (shutdownDFS && cluster != null) {
FileSystem fs = cluster.getFileSystem();
-
+
LOG.info("Shutting down Mini DFS cluster");
cluster.shutdown();
@@ -405,10 +418,10 @@
fs.close();
}
}
-
+
} catch (IOException e) {
LOG.error("shutdown", e);
-
+
} finally {
// Delete all DFS files
if(deleteOnExit) {
@@ -428,7 +441,7 @@
}
f.delete();
}
-
+
/**
* Call flushCache on all regions on all participating regionservers.
* @throws IOException
Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestDFSAbort.java
===================================================================
--- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestDFSAbort.java (revision 580433)
+++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestDFSAbort.java (working copy)
@@ -30,32 +30,35 @@
*/
public class TestDFSAbort extends HBaseClusterTestCase {
- /** constructor */
- public TestDFSAbort() {
- super();
- Logger.getRootLogger().setLevel(Level.WARN);
- Logger.getLogger(this.getClass().getPackage().getName()).setLevel(Level.DEBUG);
- }
-
/** {@inheritDoc} */
@Override
public void setUp() throws Exception {
- super.setUp();
- HTableDescriptor desc = new HTableDescriptor(getName());
- desc.addFamily(new HColumnDescriptor(HConstants.COLUMN_FAMILY_STR));
- HBaseAdmin admin = new HBaseAdmin(conf);
- admin.createTable(desc);
+ try {
+ super.setUp();
+ HTableDescriptor desc = new HTableDescriptor(getName());
+ desc.addFamily(new HColumnDescriptor(HConstants.COLUMN_FAMILY_STR));
+ HBaseAdmin admin = new HBaseAdmin(conf);
+ admin.createTable(desc);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
}
/**
* @throws Exception
*/
public void testDFSAbort() throws Exception {
- // By now the Mini DFS is running, Mini HBase is running and we have
- // created a table. Now let's yank the rug out from HBase
- cluster.getDFSCluster().shutdown();
- // Now wait for Mini HBase Cluster to shut down
- cluster.join();
+ try {
+ // By now the Mini DFS is running, Mini HBase is running and we have
+ // created a table. Now let's yank the rug out from HBase
+ cluster.getDFSCluster().shutdown();
+ // Now wait for Mini HBase Cluster to shut down
+ cluster.join();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
}
/**
Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java
===================================================================
--- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java (revision 0)
+++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java (revision 0)
@@ -0,0 +1,193 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Test log deletion as logs are rolled.
+ */
+public class TestLogRolling extends HBaseTestCase {
+ private static final Log LOG = LogFactory.getLog(TestLogRolling.class);
+ private MiniDFSCluster dfs;
+ private MiniHBaseCluster cluster;
+ private Path logdir;
+ private String tableName;
+ private byte[] value;
+
+ /**
+ * constructor
+ * @throws Exception
+ */
+ public TestLogRolling() throws Exception {
+ super();
+ this.dfs = null;
+ this.cluster = null;
+ this.logdir = null;
+ this.tableName = null;
+ this.value = null;
+
+ // We roll the log after every 256 writes
+ conf.setInt("hbase.regionserver.maxlogentries", 256);
+
+ // For less frequently updated regions flush after every 2 flushes
+ conf.setInt("hbase.hregion.memcache.optionalflushcount", 2);
+
+ // We flush the cache after every 8192 bytes
+ conf.setInt("hbase.hregion.memcache.flush.size", 8192);
+
+ // Make lease timeout longer, lease checks less frequent
+ conf.setInt("hbase.master.lease.period", 10 * 1000);
+ conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000);
+
+ // Increase the amount of time between client retries
+ conf.setLong("hbase.client.pause", 15 * 1000);
+
+ String className = this.getClass().getName();
+ StringBuilder v = new StringBuilder(className);
+ while (v.length() < 1000) {
+ v.append(className);
+ }
+ value = v.toString().getBytes(HConstants.UTF8_ENCODING);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void setUp() throws Exception {
+ try {
+ super.setUp();
+ dfs = new MiniDFSCluster(conf, 2, true, (String[]) null);
+ } catch (Exception e) {
+ LOG.fatal("error during setUp: ", e);
+ throw e;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void tearDown() throws Exception {
+ try {
+ super.tearDown();
+
+ if (cluster != null) { // shutdown mini HBase cluster
+ cluster.shutdown();
+ }
+
+ if (dfs != null) {
+ assertEquals(0, countLogFiles(true));
+ FileSystem fs = dfs.getFileSystem();
+ try {
+ dfs.shutdown();
+ } finally {
+ fs.close();
+ }
+ }
+ } catch (Exception e) {
+ LOG.fatal("error in tearDown", e);
+ throw e;
+ }
+ }
+
+ private void startAndWriteData() throws Exception {
+ cluster = new MiniHBaseCluster(conf, 1, dfs);
+ try {
+ Thread.sleep(10 * 1000); // Wait for region server to start
+ } catch (InterruptedException e) {
+ }
+
+ logdir = cluster.regionThreads.get(0).getRegionServer().getLog().dir;
+
+ // When the META table can be opened, the region servers are running
+ @SuppressWarnings("unused")
+ HTable meta = new HTable(conf, HConstants.META_TABLE_NAME);
+
+ // Create the test table and open it
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ desc.addFamily(new HColumnDescriptor(HConstants.COLUMN_FAMILY.toString()));
+ HBaseAdmin admin = new HBaseAdmin(conf);
+ admin.createTable(desc);
+ HTable table = new HTable(conf, new Text(tableName));
+
+ for (int i = 1; i <= 2048; i++) { // 2048 writes should cause 8 log rolls
+ long lockid =
+ table.startUpdate(new Text("row" + String.format("%1$04d", i)));
+ table.put(lockid, HConstants.COLUMN_FAMILY, value);
+ table.commit(lockid);
+
+ if (i % 256 == 0) {
+ // After every 256 writes sleep to let the log roller run
+
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ }
+
+ private int countLogFiles(boolean print) throws IOException {
+ Path[] logfiles = dfs.getFileSystem().listPaths(new Path[] {logdir});
+ if (print) {
+ for (int i = 0; i < logfiles.length; i++) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("logfile: " + logfiles[i].toString());
+ }
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("number of log files: " + logfiles.length);
+ }
+ return logfiles.length;
+ }
+
+ /**
+ * Tests that logs are deleted
+ *
+ * @throws Exception
+ */
+ public void testLogRolling() throws Exception {
+ tableName = getName();
+ // Force a region split after every 768KB
+ conf.setLong("hbase.hregion.max.filesize", 768L * 1024L);
+ try {
+ startAndWriteData();
+ LOG.info("Finished writing. Sleeping to let cache flusher and log roller run");
+ try {
+ // Wait for log roller and cache flusher to run a few times...
+ Thread.sleep(30L * 1000L);
+ } catch (InterruptedException e) {
+ LOG.info("Sleep interrupted", e);
+ }
+ LOG.info("Wake from sleep");
+ assertTrue(countLogFiles(true) <= 2);
+ } catch (Exception e) {
+ LOG.fatal("unexpected exception", e);
+ throw e;
+ }
+ }
+
+}
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (revision 580433)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (working copy)
@@ -139,6 +139,7 @@
30 * 1000), stop);
}
+ /** {@inheritDoc} */
public void closing(final Text regionName) {
lock.writeLock().lock();
try {
@@ -154,6 +155,7 @@
}
}
+ /** {@inheritDoc} */
public void closed(final Text regionName) {
lock.writeLock().lock();
try {
@@ -458,9 +460,17 @@
// get it when the master is panicing because for instance
// the HDFS has been yanked out from under it. Be wary of
// this message.
- if (checkFileSystem()) {
- closeAllRegions();
- restart = true;
+ try {
+ if (checkFileSystem()) {
+ closeAllRegions();
+ restart = true;
+ }
+ } catch (Exception e) {
+ LOG.fatal("file system available check failed. " +
+ "Shutting down server.", e);
+ this.stopRequested.set(true);
+ this.fsOk = false;
+ this.abortRequested = true;
}
break;
@@ -944,7 +954,8 @@
/** {@inheritDoc} */
public byte [] get(final Text regionName, final Text row,
final Text column) throws IOException {
-
+
+ checkOpen();
requestCount.incrementAndGet();
try {
return getRegion(regionName).get(row, column);
@@ -958,7 +969,8 @@
/** {@inheritDoc} */
public byte [][] get(final Text regionName, final Text row,
final Text column, final int numVersions) throws IOException {
-
+
+ checkOpen();
requestCount.incrementAndGet();
try {
return getRegion(regionName).get(row, column, numVersions);
@@ -972,7 +984,8 @@
/** {@inheritDoc} */
public byte [][] get(final Text regionName, final Text row, final Text column,
final long timestamp, final int numVersions) throws IOException {
-
+
+ checkOpen();
requestCount.incrementAndGet();
try {
return getRegion(regionName).get(row, column, timestamp, numVersions);
@@ -986,7 +999,8 @@
/** {@inheritDoc} */
public MapWritable getRow(final Text regionName, final Text row)
throws IOException {
-
+
+ checkOpen();
requestCount.incrementAndGet();
try {
HRegion region = getRegion(regionName);
@@ -1006,7 +1020,8 @@
/** {@inheritDoc} */
public MapWritable next(final long scannerId) throws IOException {
-
+
+ checkOpen();
requestCount.incrementAndGet();
try {
String scannerName = String.valueOf(scannerId);
@@ -1044,7 +1059,9 @@
/** {@inheritDoc} */
public void batchUpdate(Text regionName, long timestamp, BatchUpdate b)
- throws IOException {
+ throws IOException {
+
+ checkOpen();
requestCount.incrementAndGet();
// If timestamp == LATEST_TIMESTAMP and we have deletes, then they need
// special treatment. For these we need to first find the latest cell so
@@ -1097,7 +1114,8 @@
public long openScanner(Text regionName, Text[] cols, Text firstRow,
final long timestamp, final RowFilterInterface filter)
throws IOException {
-
+
+ checkOpen();
requestCount.incrementAndGet();
try {
HRegion r = getRegion(regionName);
@@ -1112,7 +1130,7 @@
leases.createLease(scannerId, scannerId, new ScannerListener(scannerName));
return scannerId;
} catch (IOException e) {
- LOG.error("Opening scanner (fsOk: " + this.fsOk + ")",
+ LOG.error("Error opening scanner (fsOk: " + this.fsOk + ")",
RemoteExceptionHandler.checkIOException(e));
checkFileSystem();
throw e;
@@ -1121,6 +1139,7 @@
/** {@inheritDoc} */
public void close(final long scannerId) throws IOException {
+ checkOpen();
requestCount.incrementAndGet();
try {
String scannerName = String.valueOf(scannerId);
@@ -1257,6 +1276,20 @@
}
/**
+ * Called to verify that this server is up and running.
+ *
+ * @throws IOException
+ */
+ private void checkOpen() throws IOException {
+ if (stopRequested.get() || abortRequested) {
+ throw new IOException("Server not running");
+ }
+ if (!fsOk) {
+ throw new IOException("File system not available");
+ }
+ }
+
+ /**
* Checks to see if the file system is still accessible.
* If not, sets abortRequested and stopRequested
*
@@ -1267,10 +1300,14 @@
FileSystem fs = null;
try {
fs = FileSystem.get(this.conf);
- } catch (IOException e) {
+ if (fs != null && !FSUtils.isFileSystemAvailable(fs)) {
+ LOG.fatal("Shutting down HRegionServer: file system not available");
+ this.abortRequested = true;
+ this.stopRequested.set(true);
+ fsOk = false;
+ }
+ } catch (Exception e) {
LOG.error("Failed get of filesystem", e);
- }
- if (fs != null && !FSUtils.isFileSystemAvailable(fs, stopRequested)) {
LOG.fatal("Shutting down HRegionServer: file system not available");
this.abortRequested = true;
this.stopRequested.set(true);
@@ -1303,6 +1340,7 @@
return regionsToCheck;
}
+ /** {@inheritDoc} */
public long getProtocolVersion(final String protocol,
@SuppressWarnings("unused") final long clientVersion)
throws IOException {
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java (revision 580433)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java (working copy)
@@ -71,9 +71,6 @@
/** Used to construct the name of the directory in which a HRegion resides */
static final String HREGIONDIR_PREFIX = "hregion_";
- // TODO: Someone may try to name a column family 'log'. If they
- // do, it will clash with the HREGION log dir subdirectory. FIX.
-
/** Used to construct the name of the log directory for a region server */
static final String HREGION_LOGDIR_NAME = "log";
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (revision 580433)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (working copy)
@@ -19,91 +19,124 @@
*/
package org.apache.hadoop.hbase;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.conf.*;
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
-
/**
* HLog stores all the edits to the HStore.
- *
- * It performs logfile-rolling, so external callers are not aware that the
+ *
+ * It performs logfile-rolling, so external callers are not aware that the
* underlying file is being rolled.
*
- * A single HLog is used by several HRegions simultaneously.
- *
- *
Each HRegion is identified by a unique long int. HRegions do
+ *
+ * A single HLog is used by several HRegions simultaneously.
+ *
+ *
+ * Each HRegion is identified by a unique long int. HRegions do
* not need to declare themselves before using the HLog; they simply include
- * their HRegion-id in the append or
+ * their HRegion-id in the append or
* completeCacheFlush calls.
*
- *
An HLog consists of multiple on-disk files, which have a chronological
- * order. As data is flushed to other (better) on-disk structures, the log
- * becomes obsolete. We can destroy all the log messages for a given
- * HRegion-id up to the most-recent CACHEFLUSH message from that HRegion.
+ *
+ * An HLog consists of multiple on-disk files, which have a chronological order.
+ * As data is flushed to other (better) on-disk structures, the log becomes
+ * obsolete. We can destroy all the log messages for a given HRegion-id up to
+ * the most-recent CACHEFLUSH message from that HRegion.
*
- *
It's only practical to delete entire files. Thus, we delete an entire
- * on-disk file F when all of the messages in F have a log-sequence-id that's
- * older (smaller) than the most-recent CACHEFLUSH message for every HRegion
- * that has a message in F.
- *
- *
TODO: Vuk Ercegovac also pointed out that keeping HBase HRegion edit logs
- * in HDFS is currently flawed. HBase writes edits to logs and to a memcache.
- * The 'atomic' write to the log is meant to serve as insurance against
- * abnormal RegionServer exit: on startup, the log is rerun to reconstruct an
- * HRegion's last wholesome state. But files in HDFS do not 'exist' until they
- * are cleanly closed -- something that will not happen if RegionServer exits
- * without running its 'close'.
+ *
+ * It's only practical to delete entire files. Thus, we delete an entire on-disk
+ * file F when all of the messages in F have a log-sequence-id that's older
+ * (smaller) than the most-recent CACHEFLUSH message for every HRegion that has
+ * a message in F.
+ *
+ *
+ * Synchronized methods can never execute in parallel. However, between the
+ * start of a cache flush and the completion point, appends are allowed but log
+ * rolling is not. To prevent log rolling taking place during this period, a
+ * separate reentrant lock is used.
+ *
+ *
+ * TODO: Vuk Ercegovac also pointed out that keeping HBase HRegion edit logs in
+ * HDFS is currently flawed. HBase writes edits to logs and to a memcache. The
+ * 'atomic' write to the log is meant to serve as insurance against abnormal
+ * RegionServer exit: on startup, the log is rerun to reconstruct an HRegion's
+ * last wholesome state. But files in HDFS do not 'exist' until they are cleanly
+ * closed -- something that will not happen if RegionServer exits without
+ * running its 'close'.
*/
public class HLog implements HConstants {
private static final Log LOG = LogFactory.getLog(HLog.class);
-
+
static final String HLOG_DATFILE = "hlog.dat.";
+
static final Text METACOLUMN = new Text("METACOLUMN:");
+
static final Text METAROW = new Text("METAROW");
FileSystem fs;
+
Path dir;
+
Configuration conf;
+ final long threadWakeFrequency;
+
SequenceFile.Writer writer;
+
TreeMap outputfiles = new TreeMap();
- volatile boolean insideCacheFlush = false;
- TreeMap regionToLastFlush = new TreeMap();
+ HashMap lastSeqWritten = new HashMap();
volatile boolean closed = false;
+
+ private final Integer sequenceLock = new Integer(0);
volatile long logSeqNum = 0;
- long filenum = 0;
- AtomicInteger numEntries = new AtomicInteger(0);
- Integer rollLock = new Integer(0);
+ volatile long filenum = 0;
+ volatile int numEntries = 0;
+
+ // This lock prevents starting a log roll during a cache flush.
+ // synchronized is insufficient because a cache flush spans two method calls.
+ private final Lock cacheFlushLock = new ReentrantLock();
+
/**
- * Split up a bunch of log files, that are no longer being written to,
- * into new files, one per region. Delete the old log files when ready.
+ * Split up a bunch of log files, that are no longer being written to, into
+ * new files, one per region. Delete the old log files when finished.
+ *
* @param rootDir Root directory of the HBase instance
- * @param srcDir Directory of log files to split:
- * e.g. ${ROOTDIR}/log_HOST_PORT
+ * @param srcDir Directory of log files to split: e.g.
+ * ${ROOTDIR}/log_HOST_PORT
* @param fs FileSystem
* @param conf HBaseConfiguration
* @throws IOException
*/
static void splitLog(Path rootDir, Path srcDir, FileSystem fs,
- Configuration conf) throws IOException {
- Path logfiles[] = fs.listPaths(new Path[] {srcDir});
+ Configuration conf) throws IOException {
+ Path logfiles[] = fs.listPaths(new Path[] { srcDir });
LOG.info("splitting " + logfiles.length + " log(s) in " +
srcDir.toString());
HashMap logWriters =
new HashMap();
try {
- for(int i = 0; i < logfiles.length; i++) {
+ for (int i = 0; i < logfiles.length; i++) {
if (LOG.isDebugEnabled()) {
LOG.debug("Splitting " + logfiles[i]);
}
@@ -118,7 +151,7 @@
try {
HLogKey key = new HLogKey();
HLogEdit val = new HLogEdit();
- while(in.next(key, val)) {
+ while (in.next(key, val)) {
Text regionName = key.getRegionName();
SequenceFile.Writer w = logWriters.get(regionName);
if (w == null) {
@@ -141,15 +174,15 @@
}
}
} finally {
- for (SequenceFile.Writer w: logWriters.values()) {
+ for (SequenceFile.Writer w : logWriters.values()) {
w.close();
}
}
-
- if(fs.exists(srcDir)) {
- if(! fs.delete(srcDir)) {
+
+ if (fs.exists(srcDir)) {
+ if (!fs.delete(srcDir)) {
LOG.error("Cannot delete: " + srcDir);
- if(! FileUtil.fullyDelete(new File(srcDir.toString()))) {
+ if (!FileUtil.fullyDelete(new File(srcDir.toString()))) {
throw new IOException("Cannot delete: " + srcDir);
}
}
@@ -160,10 +193,10 @@
/**
* Create an edit log at the given dir location.
*
- * You should never have to load an existing log. If there is a log
- * at startup, it should have already been processed and deleted by
- * the time the HLog object is started up.
- *
+ * You should never have to load an existing log. If there is a log at
+ * startup, it should have already been processed and deleted by the time the
+ * HLog object is started up.
+ *
* @param fs
* @param dir
* @param conf
@@ -173,6 +206,7 @@
this.fs = fs;
this.dir = dir;
this.conf = conf;
+ this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
if (fs.exists(dir)) {
throw new IOException("Target HLog directory already exists: " + dir);
@@ -180,115 +214,117 @@
fs.mkdirs(dir);
rollWriter();
}
-
- synchronized void setSequenceNumber(long newvalue) {
- if (newvalue > logSeqNum) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("changing sequence number from " + logSeqNum + " to " +
- newvalue);
+
+ /**
+ * Called by HRegionServer when it opens a new region to ensure that log
+ * sequence numbers are always greater than the latest sequence number of the
+ * region being brought on-line.
+ *
+ * @param newvalue
+ */
+ void setSequenceNumber(long newvalue) {
+ synchronized (sequenceLock) {
+ if (newvalue > logSeqNum) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("changing sequence number from " + logSeqNum + " to " +
+ newvalue);
+ }
+ logSeqNum = newvalue;
}
- logSeqNum = newvalue;
}
}
/**
- * Roll the log writer. That is, start writing log messages to a new file.
+ * Roll the log writer. That is, start writing log messages to a new file.
*
- * The 'rollLock' prevents us from entering rollWriter() more than
- * once at a time.
+ * Because a log cannot be rolled during a cache flush, and a cache flush
+ * spans two method calls, a special lock needs to be obtained so that a cache
+ * flush cannot start when the log is being rolled and the log cannot be
+ * rolled during a cache flush.
*
- * The 'this' lock limits access to the current writer so
- * we don't append multiple items simultaneously.
- *
+ * Note that this method cannot be synchronized because it is possible that
+ * startCacheFlush runs, obtaining the cacheFlushLock, then this method could
+ * start which would obtain the lock on this but block on obtaining the
+ * cacheFlushLock and then completeCacheFlush could be called which would wait
+ * for the lock on this and consequently never release the cacheFlushLock
+ *
* @throws IOException
*/
- void rollWriter() throws IOException {
- synchronized(rollLock) {
+ synchronized void rollWriter() throws IOException {
+ boolean locked = false;
+ while (!locked && !closed) {
+ if (cacheFlushLock.tryLock()) {
+ locked = true;
+ break;
+ }
+ try {
+ this.wait(threadWakeFrequency);
+ } catch (InterruptedException e) {
+ }
+ }
+ if (closed) {
+ if (locked) {
+ cacheFlushLock.unlock();
+ }
+ throw new IOException("Cannot roll log; log is closed");
+ }
- // Try to roll the writer to a new file. We may have to
- // wait for a cache-flush to complete. In the process,
- // compute a list of old log files that can be deleted.
+ // If we get here we have locked out both cache flushes and appends
- Vector toDeleteList = new Vector();
- synchronized(this) {
- if(closed) {
- throw new IOException("Cannot roll log; log is closed");
+ try {
+ if (writer != null) {
+ // Close the current writer, get a new one.
+ writer.close();
+ Path p = computeFilename(filenum - 1);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closing current log writer " + p.toString() +
+ " to get a new one");
}
-
- // Make sure we do not roll the log while inside a
- // cache-flush. Otherwise, the log sequence number for
- // the CACHEFLUSH operation will appear in a "newer" log file
- // than it should.
- while(insideCacheFlush) {
- try {
- wait();
- } catch (InterruptedException ie) {
- // continue;
- }
- }
-
- // Close the current writer (if any), and grab a new one.
- if(writer != null) {
- writer.close();
- Path p = computeFilename(filenum - 1);
- if(LOG.isDebugEnabled()) {
- LOG.debug("Closing current log writer " + p.toString() +
- " to get a new one");
- }
- if (filenum > 0) {
+ if (filenum > 0) {
+ synchronized (sequenceLock) {
outputfiles.put(logSeqNum - 1, p);
}
}
- Path newPath = computeFilename(filenum++);
- this.writer = SequenceFile.createWriter(fs, conf, newPath,
+ }
+ Path newPath = computeFilename(filenum++);
+ this.writer = SequenceFile.createWriter(fs, conf, newPath,
HLogKey.class, HLogEdit.class);
- if(LOG.isDebugEnabled()) {
- LOG.debug("new log writer created at " + newPath);
- }
-
- // Can we delete any of the old log files?
- // First, compute the oldest relevant log operation
- // over all the regions.
- long oldestOutstandingSeqNum = Long.MAX_VALUE;
- for(Long l: regionToLastFlush.values()) {
- long curSeqNum = l.longValue();
-
- if(curSeqNum < oldestOutstandingSeqNum) {
- oldestOutstandingSeqNum = curSeqNum;
- }
- }
+ LOG.info("new log writer created at " + newPath);
- // Next, remove all files with a final ID that's older
- // than the oldest pending region-operation.
- for(Iterator it = outputfiles.keySet().iterator(); it.hasNext();) {
- long maxSeqNum = it.next().longValue();
- if(maxSeqNum < oldestOutstandingSeqNum) {
- Path p = outputfiles.get(maxSeqNum);
- it.remove();
- toDeleteList.add(p);
-
- } else {
- break;
- }
+ // Can we delete any of the old log files?
+
+ TreeSet sequenceNumbers =
+ new TreeSet(lastSeqWritten.values());
+
+ if (sequenceNumbers.size() > 0) {
+ long oldestOutstandingSeqNum = sequenceNumbers.first();
+
+ // Get the set of all log files whose final ID is older than the oldest
+ // pending region operation
+
+ sequenceNumbers.clear();
+ sequenceNumbers.addAll(outputfiles.headMap(
+ oldestOutstandingSeqNum).keySet());
+
+ // Now remove old log files (if any)
+
+ for (Long seq : sequenceNumbers) {
+ Path p = outputfiles.remove(seq);
+ LOG.info("removing old log file " + p.toString());
+ fs.delete(p);
}
}
+ this.numEntries = 0;
- // Actually delete them, if any!
- for(Iterator it = toDeleteList.iterator(); it.hasNext(); ) {
- Path p = it.next();
- if(LOG.isDebugEnabled()) {
- LOG.debug("removing old log file " + p.toString());
- }
- fs.delete(p);
- }
- this.numEntries.set(0);
+ } finally {
+ cacheFlushLock.unlock();
}
}
/**
- * This is a convenience method that computes a new filename with
- * a given file-number.
+ * This is a convenience method that computes a new filename with a given
+ * file-number.
*/
Path computeFilename(final long fn) {
return new Path(dir, HLOG_DATFILE + String.format("%1$03d", fn));
@@ -296,19 +332,21 @@
/**
* Shut down the log and delete the log directory
+ *
* @throws IOException
*/
synchronized void closeAndDelete() throws IOException {
close();
fs.delete(dir);
}
-
+
/**
* Shut down the log.
+ *
* @throws IOException
*/
synchronized void close() throws IOException {
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("closing log writer in " + this.dir.toString());
}
this.writer.close();
@@ -319,16 +357,19 @@
* Append a set of edits to the log. Log edits are keyed by regionName,
* rowname, and log-sequence-id.
*
- * Later, if we sort by these keys, we obtain all the relevant edits for
- * a given key-range of the HRegion (TODO). Any edits that do not have a
+ * Later, if we sort by these keys, we obtain all the relevant edits for a
+ * given key-range of the HRegion (TODO). Any edits that do not have a
* matching {@link HConstants#COMPLETE_CACHEFLUSH} message can be discarded.
*
- * Logs cannot be restarted once closed, or once the HLog process dies.
- * Each time the HLog starts, it must create a new log. This means that
- * other systems should process the log appropriately upon each startup
- * (and prior to initializing HLog).
+ *
+ * Logs cannot be restarted once closed, or once the HLog process dies. Each
+ * time the HLog starts, it must create a new log. This means that other
+ * systems should process the log appropriately upon each startup (and prior
+ * to initializing HLog).
*
- * We need to seize a lock on the writer so that writes are atomic.
+ * synchronized prevents appends during the completion of a cache flush or for
+ * the duration of a log roll.
+ *
* @param regionName
* @param tableName
* @param row
@@ -337,147 +378,133 @@
* @throws IOException
*/
synchronized void append(Text regionName, Text tableName, Text row,
- TreeMap columns, long timestamp)
- throws IOException {
- if(closed) {
+ TreeMap columns, long timestamp) throws IOException {
+ if (closed) {
throw new IOException("Cannot append; log is closed");
}
-
+
long seqNum[] = obtainSeqNum(columns.size());
- // The 'regionToLastFlush' map holds the sequence id of the
- // most recent flush for every regionName. However, for regions
- // that don't have any flush yet, the relevant operation is the
- // first one that's been added.
- if (regionToLastFlush.get(regionName) == null) {
- regionToLastFlush.put(regionName, seqNum[0]);
- }
+ // The 'lastSeqWritten' map holds the sequence number of the most recent
+ // write for each region. When the cache is flushed, the entry for the
+ // region being flushed is removed if the sequence number of the flush
+ // is greater than or equal to the value in lastSeqWritten
+ lastSeqWritten.put(regionName, seqNum[seqNum.length - 1]);
+
int counter = 0;
- for (Map.Entry es: columns.entrySet()) {
+ for (Map.Entry es : columns.entrySet()) {
HLogKey logKey =
new HLogKey(regionName, tableName, row, seqNum[counter++]);
HLogEdit logEdit = new HLogEdit(es.getKey(), es.getValue(), timestamp);
writer.append(logKey, logEdit);
- numEntries.getAndIncrement();
+ numEntries++;
}
}
/** @return How many items have been added to the log */
int getNumEntries() {
- return numEntries.get();
+ return numEntries;
}
/**
- * Obtain a log sequence number. This seizes the whole HLog
- * lock, but it shouldn't last too long.
+ * Obtain a log sequence number.
*/
- synchronized long obtainSeqNum() {
- return logSeqNum++;
+ private long obtainSeqNum() {
+ long value;
+ synchronized (sequenceLock) {
+ value = logSeqNum++;
+ }
+ return value;
}
-
+
/**
* Obtain a specified number of sequence numbers
- *
- * @param num - number of sequence numbers to obtain
- * @return - array of sequence numbers
+ *
+ * @param num number of sequence numbers to obtain
+ * @return array of sequence numbers
*/
- synchronized long[] obtainSeqNum(int num) {
+ private long[] obtainSeqNum(int num) {
long[] results = new long[num];
- for (int i = 0; i < num; i++) {
- results[i] = logSeqNum++;
+ synchronized (sequenceLock) {
+ for (int i = 0; i < num; i++) {
+ results[i] = logSeqNum++;
+ }
}
return results;
}
/**
- * By acquiring a log sequence ID, we can allow log messages
- * to continue while we flush the cache.
+ * By acquiring a log sequence ID, we can allow log messages to continue while
+ * we flush the cache.
*
- * Set a flag so that we do not roll the log between the start
- * and complete of a cache-flush. Otherwise the log-seq-id for
- * the flush will not appear in the correct logfile.
+ * Acquire a lock so that we do not roll the log between the start and
+ * completion of a cache-flush. Otherwise the log-seq-id for the flush will
+ * not appear in the correct logfile.
+ *
* @return sequence ID to pass {@link #completeCacheFlush(Text, Text, long)}
* @see #completeCacheFlush(Text, Text, long)
* @see #abortCacheFlush()
*/
- synchronized long startCacheFlush() {
- while (this.insideCacheFlush) {
- try {
- wait();
- } catch (InterruptedException ie) {
- // continue
- }
- }
- this.insideCacheFlush = true;
- notifyAll();
+ long startCacheFlush() {
+ cacheFlushLock.lock();
return obtainSeqNum();
}
- /** Complete the cache flush
+ /**
+ * Complete the cache flush
+ *
+ * Protected by this and cacheFlushLock
+ *
* @param regionName
* @param tableName
* @param logSeqId
* @throws IOException
*/
synchronized void completeCacheFlush(final Text regionName,
- final Text tableName, final long logSeqId)
- throws IOException {
- if(this.closed) {
- return;
- }
-
- if (!this.insideCacheFlush) {
- throw new IOException("Impossible situation: inside " +
- "completeCacheFlush(), but 'insideCacheFlush' flag is false");
- }
- HLogKey key = new HLogKey(regionName, tableName, HLog.METAROW, logSeqId);
- this.writer.append(key,
- new HLogEdit(HLog.METACOLUMN, HGlobals.completeCacheFlush.get(),
- System.currentTimeMillis()));
- this.numEntries.getAndIncrement();
+ final Text tableName, final long logSeqId) throws IOException {
- // Remember the most-recent flush for each region.
- // This is used to delete obsolete log files.
- this.regionToLastFlush.put(regionName, Long.valueOf(logSeqId));
+ try {
+ if (this.closed) {
+ return;
+ }
- cleanup();
+ writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId),
+ new HLogEdit(HLog.METACOLUMN, HGlobals.completeCacheFlush.get(),
+ System.currentTimeMillis()));
+
+ numEntries++;
+ Long seq = lastSeqWritten.get(regionName);
+ if (seq != null && logSeqId >= seq) {
+ lastSeqWritten.remove(regionName);
+ }
+
+ } finally {
+ cacheFlushLock.unlock();
+ notifyAll(); // wake up the log roller if it is waiting
+ }
}
-
+
/**
- * Abort a cache flush.
- * This method will clear waits on {@link #insideCacheFlush}. Call if the
- * flush fails. Note that the only recovery for an aborted flush currently
- * is a restart of the regionserver so the snapshot content dropped by the
- * failure gets restored to the memcache.
+ * Abort a cache flush. This method will clear waits on
+ * {@link #insideCacheFlush}. Call if the flush fails. Note that the only
+ * recovery for an aborted flush currently is a restart of the regionserver so
+ * the snapshot content dropped by the failure gets restored to the memcache.
*/
synchronized void abortCacheFlush() {
- cleanup();
- }
-
- private synchronized void cleanup() {
- this.insideCacheFlush = false;
+ this.cacheFlushLock.unlock();
notifyAll();
}
-
- /**
- * Abort a cache flush.
- * This method will clear waits on {@link #insideCacheFlush} but if this
- * method is called, we are losing data. TODO: Fix.
- */
- synchronized void abort() {
- this.insideCacheFlush = false;
- notifyAll();
- }
private static void usage() {
System.err.println("Usage: java org.apache.hbase.HLog" +
" {--dump ... | --split ...}");
}
-
+
/**
* Pass one or more log file names and it will either dump out a text version
* on stdout or split the specified log files.
+ *
* @param args
* @throws IOException
*/
@@ -490,7 +517,7 @@
if (args[0].compareTo("--dump") != 0) {
if (args[0].compareTo("--split") == 0) {
dump = false;
-
+
} else {
usage();
System.exit(-1);
@@ -499,7 +526,7 @@
Configuration conf = new HBaseConfiguration();
FileSystem fs = FileSystem.get(conf);
Path baseDir = new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR));
-
+
for (int i = 1; i < args.length; i++) {
Path logPath = new Path(args[i]);
if (!fs.exists(logPath)) {
@@ -513,7 +540,7 @@
try {
HLogKey key = new HLogKey();
HLogEdit val = new HLogEdit();
- while(log.next(key, val)) {
+ while (log.next(key, val)) {
System.out.println(key.toString() + " " + val.toString());
}
} finally {
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (revision 580433)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (working copy)
@@ -504,7 +504,7 @@
LOG.error("Scan ROOT region", e);
if (tries == numRetries - 1) {
// We ran out of tries. Make sure the file system is still available
- if (checkFileSystem()) {
+ if (!checkFileSystem()) {
continue; // Avoid sleeping.
}
}
@@ -654,7 +654,7 @@
if (tries == numRetries - 1) {
// We ran out of tries. Make sure the file system is still
// available
- if (checkFileSystem()) {
+ if (!checkFileSystem()) {
continue; // avoid sleeping
}
}
@@ -941,7 +941,7 @@
*/
protected boolean checkFileSystem() {
if (fsOk) {
- if (!FSUtils.isFileSystemAvailable(fs, closed)) {
+ if (!FSUtils.isFileSystemAvailable(fs)) {
LOG.fatal("Shutting down HBase cluster: file system not available");
closed.set(true);
fsOk = false;
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (revision 580433)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (working copy)
@@ -210,6 +210,7 @@
final int memcacheFlushSize;
final int blockingMemcacheSize;
protected final long threadWakeFrequency;
+ protected final int optionalFlushCount;
private final HLocking lock = new HLocking();
private long desiredMaxFileSize;
private final long maxSequenceId;
@@ -247,6 +248,8 @@
this.regionInfo = regionInfo;
this.memcache = new HMemcache();
this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
+ this.optionalFlushCount =
+ conf.getInt("hbase.hregion.memcache.optionalflushcount", 10);
// Declare the regionName. This is a unique string for the region, used to
// build a unique filename.
@@ -728,11 +731,13 @@
void optionallyFlush() throws IOException {
if(this.memcache.getSize() > this.memcacheFlushSize) {
flushcache(false);
- } else if (this.memcache.getSize() > 0 && this.noFlushCount >= 10) {
- LOG.info("Optional flush called " + this.noFlushCount +
- " times when data present without flushing. Forcing one.");
- flushcache(false);
- if (this.memcache.getSize() > 0) {
+ } else if (this.memcache.getSize() > 0) {
+ if (this.noFlushCount >= this.optionalFlushCount) {
+ LOG.info("Optional flush called " + this.noFlushCount +
+ " times when data present without flushing. Forcing one.");
+ flushcache(false);
+
+ } else {
// Only increment if something in the cache.
// Gets zero'd when a flushcache is called.
this.noFlushCount++;
@@ -864,25 +869,31 @@
retval.memcacheSnapshot.size());
}
- // A. Flush memcache to all the HStores.
- // Keep running vector of all store files that includes both old and the
- // just-made new flush store file.
- for (HStore hstore: stores.values()) {
- hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId);
+ try {
+ // A. Flush memcache to all the HStores.
+ // Keep running vector of all store files that includes both old and the
+ // just-made new flush store file.
+ for (HStore hstore: stores.values()) {
+ hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId);
+ }
+ } catch (IOException e) {
+ // An exception here means that the snapshot was not persisted.
+ // The hlog needs to be replayed so its content is restored to memcache.
+ // Currently, only a server restart will do this.
+ this.log.abortCacheFlush();
+ throw new DroppedSnapshotException(e.getMessage());
}
+ // If we get to here, the HStores have been written. If we get an
+ // error in completeCacheFlush it will release the lock it is holding
+
// B. Write a FLUSHCACHE-COMPLETE message to the log.
// This tells future readers that the HStores were emitted correctly,
// and that all updates to the log for this regionName that have lower
// log-sequence-ids can be safely ignored.
this.log.completeCacheFlush(this.regionInfo.regionName,
- regionInfo.tableDesc.getName(), logCacheFlushId);
- } catch (IOException e) {
- // An exception here means that the snapshot was not persisted.
- // The hlog needs to be replayed so its content is restored to memcache.
- // Currently, only a server restart will do this.
- this.log.abortCacheFlush();
- throw new DroppedSnapshotException(e.getMessage());
+ regionInfo.tableDesc.getName(), logCacheFlushId);
+
} finally {
// C. Delete the now-irrelevant memcache snapshot; its contents have been
// dumped to disk-based HStores or, if error, clear aborted snapshot.
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/FSUtils.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/FSUtils.java (revision 580433)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/FSUtils.java (working copy)
@@ -26,7 +26,6 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.dfs.DistributedFileSystem;
/**
@@ -38,48 +37,37 @@
/**
* Not instantiable
*/
- private FSUtils() {super();}
+ private FSUtils() {}
/**
* Checks to see if the specified file system is available
*
* @param fs
- * @param closed Optional flag. If non-null and set, will abort test of
- * filesytem. Presumption is a flag shared by multiple threads. Another
- * may have already determined the filesystem -- or something else -- bad.
* @return true if the specified file system is available.
*/
- public static boolean isFileSystemAvailable(final FileSystem fs,
- final AtomicBoolean closed) {
+ public static boolean isFileSystemAvailable(final FileSystem fs) {
if (!(fs instanceof DistributedFileSystem)) {
return true;
}
+ String exception = "";
boolean available = false;
DistributedFileSystem dfs = (DistributedFileSystem) fs;
- int maxTries = dfs.getConf().getInt("hbase.client.retries.number", 3);
- Path root =
- fs.makeQualified(new Path(dfs.getConf().get(HConstants.HBASE_DIR, "/")));
- for (int i = 0; i < maxTries && (closed == null || !closed.get()); i++) {
- IOException ex = null;
- try {
- if (dfs.exists(root)) {
- available = true;
- break;
- }
- } catch (IOException e) {
- ex = e;
+ try {
+ if (dfs.exists(new Path("/"))) {
+ available = true;
}
- String exception = (ex == null)? "": ": " + ex.getMessage();
- LOG.info("Failed exists test on " + root + " by thread " +
- Thread.currentThread().getName() + " (Attempt " + i + " of " +
- maxTries +"): " + exception);
+ } catch (IOException e) {
+ exception = e.getMessage();
}
+ LOG.info("Failed file system available test. Thread: " +
+ Thread.currentThread().getName() + ": " + exception);
+
try {
if (!available) {
fs.close();
}
- } catch (IOException e) {
+ } catch (Exception e) {
LOG.error("file system close failed: ", e);
}
return available;