Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java (revision 0) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java (revision 0) @@ -0,0 +1,200 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.dfs.MiniDFSCluster; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; + +/** + * Test log deletion as logs are rolled. + */ +public class TestLogRolling extends HBaseTestCase { + private static final Log LOG = LogFactory.getLog(TestLogRolling.class); + private MiniDFSCluster dfs; + private MiniHBaseCluster cluster; + private Path logdir; + private String tableName; + private byte[] value; + + /** + * constructor + * @throws Exception + */ + public TestLogRolling() throws Exception { + super(); + try { + this.dfs = null; + this.cluster = null; + this.logdir = null; + this.tableName = null; + this.value = null; + + // We roll the log after every 256 writes + conf.setInt("hbase.regionserver.maxlogentries", 256); + + // For less frequently updated regions flush after every 2 flushes + conf.setInt("hbase.hregion.memcache.optionalflushcount", 2); + + // We flush the cache after every 8192 bytes + conf.setInt("hbase.hregion.memcache.flush.size", 8192); + + // Make lease timeout longer, lease checks less frequent + conf.setInt("hbase.master.lease.period", 10 * 1000); + conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000); + + // Increase the amount of time between client retries + conf.setLong("hbase.client.pause", 15 * 1000); + + String className = this.getClass().getName(); + StringBuilder v = new StringBuilder(className); + while (v.length() < 1000) { + v.append(className); + } + value = v.toString().getBytes(HConstants.UTF8_ENCODING); + + } catch (Exception e) { + LOG.fatal("error in constructor", e); + throw e; + } + } + + /** {@inheritDoc} */ + @Override + public void setUp() throws Exception { + try { + super.setUp(); + dfs = new MiniDFSCluster(conf, 2, true, (String[]) null); + } catch (Exception e) { + LOG.fatal("error during setUp: ", e); + throw e; + } + } + + /** {@inheritDoc} */ + @Override + public void tearDown() throws Exception { + try { + super.tearDown(); + + if (cluster != null) { // shutdown mini HBase cluster + cluster.shutdown(); + } + + if (dfs != null) { + FileSystem fs = dfs.getFileSystem(); + try { + dfs.shutdown(); + } finally { + if (fs != null) { + fs.close(); + } + } + } + } catch (Exception e) { + LOG.fatal("error in tearDown", e); + throw e; + } + } + + private void startAndWriteData() throws Exception { + cluster = new MiniHBaseCluster(conf, 1, dfs); + try { + Thread.sleep(10 * 1000); // Wait for region server to start + } catch (InterruptedException e) { + } + + logdir = cluster.regionThreads.get(0).getRegionServer().getLog().dir; + + // When the META table can be opened, the region servers are running + @SuppressWarnings("unused") + HTable meta = new HTable(conf, HConstants.META_TABLE_NAME); + + // Create the test table and open it + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(HConstants.COLUMN_FAMILY.toString())); + HBaseAdmin admin = new HBaseAdmin(conf); + admin.createTable(desc); + HTable table = new HTable(conf, new Text(tableName)); + + for (int i = 1; i <= 2048; i++) { // 2048 writes should cause 8 log rolls + long lockid = + table.startUpdate(new Text("row" + String.format("%1$04d", i))); + table.put(lockid, HConstants.COLUMN_FAMILY, value); + table.commit(lockid); + + if (i % 256 == 0) { + // After every 256 writes sleep to let the log roller run + + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + } + } + } + } + + private int countLogFiles(boolean print) throws IOException { + Path[] logfiles = dfs.getFileSystem().listPaths(new Path[] {logdir}); + if (print) { + for (int i = 0; i < logfiles.length; i++) { + if (LOG.isDebugEnabled()) { + LOG.debug("logfile: " + logfiles[i].toString()); + } + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("number of log files: " + logfiles.length); + } + return logfiles.length; + } + + /** + * Tests that logs are deleted + * + * @throws Exception + */ + public void testLogRolling() throws Exception { + tableName = getName(); + // Force a region split after every 768KB + conf.setLong("hbase.hregion.max.filesize", 768L * 1024L); + try { + startAndWriteData(); + LOG.info("Finished writing. Sleeping to let cache flusher and log roller run"); + try { + // Wait for log roller and cache flusher to run a few times... + Thread.sleep(30L * 1000L); + } catch (InterruptedException e) { + LOG.info("Sleep interrupted", e); + } + LOG.info("Wake from sleep"); + assertTrue(countLogFiles(true) <= 2); + } catch (Exception e) { + LOG.fatal("unexpected exception", e); + throw e; + } + } + +} Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (revision 580788) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (working copy) @@ -19,91 +19,124 @@ */ package org.apache.hadoop.hbase; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.*; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.SequenceFile.Reader; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.conf.*; -import java.io.*; -import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; - /** * HLog stores all the edits to the HStore. - * - * It performs logfile-rolling, so external callers are not aware that the + * + * It performs logfile-rolling, so external callers are not aware that the * underlying file is being rolled. * - *
A single HLog is used by several HRegions simultaneously. - * - *
Each HRegion is identified by a unique long int. HRegions do
+ *
+ * A single HLog is used by several HRegions simultaneously. + * + *
+ * Each HRegion is identified by a unique long int. HRegions do
* not need to declare themselves before using the HLog; they simply include
- * their HRegion-id in the append or
+ * their HRegion-id in the append or
* completeCacheFlush calls.
*
- *
An HLog consists of multiple on-disk files, which have a chronological - * order. As data is flushed to other (better) on-disk structures, the log - * becomes obsolete. We can destroy all the log messages for a given - * HRegion-id up to the most-recent CACHEFLUSH message from that HRegion. + *
+ * An HLog consists of multiple on-disk files, which have a chronological order. + * As data is flushed to other (better) on-disk structures, the log becomes + * obsolete. We can destroy all the log messages for a given HRegion-id up to + * the most-recent CACHEFLUSH message from that HRegion. * - *
It's only practical to delete entire files. Thus, we delete an entire - * on-disk file F when all of the messages in F have a log-sequence-id that's - * older (smaller) than the most-recent CACHEFLUSH message for every HRegion - * that has a message in F. - * - *
TODO: Vuk Ercegovac also pointed out that keeping HBase HRegion edit logs - * in HDFS is currently flawed. HBase writes edits to logs and to a memcache. - * The 'atomic' write to the log is meant to serve as insurance against - * abnormal RegionServer exit: on startup, the log is rerun to reconstruct an - * HRegion's last wholesome state. But files in HDFS do not 'exist' until they - * are cleanly closed -- something that will not happen if RegionServer exits - * without running its 'close'. + *
+ * It's only practical to delete entire files. Thus, we delete an entire on-disk + * file F when all of the messages in F have a log-sequence-id that's older + * (smaller) than the most-recent CACHEFLUSH message for every HRegion that has + * a message in F. + * + *
+ * Synchronized methods can never execute in parallel. However, between the + * start of a cache flush and the completion point, appends are allowed but log + * rolling is not. To prevent log rolling taking place during this period, a + * separate reentrant lock is used. + * + *
+ * TODO: Vuk Ercegovac also pointed out that keeping HBase HRegion edit logs in
+ * HDFS is currently flawed. HBase writes edits to logs and to a memcache. The
+ * 'atomic' write to the log is meant to serve as insurance against abnormal
+ * RegionServer exit: on startup, the log is rerun to reconstruct an HRegion's
+ * last wholesome state. But files in HDFS do not 'exist' until they are cleanly
+ * closed -- something that will not happen if RegionServer exits without
+ * running its 'close'.
*/
public class HLog implements HConstants {
private static final Log LOG = LogFactory.getLog(HLog.class);
-
+
static final String HLOG_DATFILE = "hlog.dat.";
+
static final Text METACOLUMN = new Text("METACOLUMN:");
+
static final Text METAROW = new Text("METAROW");
FileSystem fs;
+
Path dir;
+
Configuration conf;
+ final long threadWakeFrequency;
+
SequenceFile.Writer writer;
+
TreeMap Logs cannot be restarted once closed, or once the HLog process dies.
- * Each time the HLog starts, it must create a new log. This means that
- * other systems should process the log appropriately upon each startup
- * (and prior to initializing HLog).
+ *
+ * Logs cannot be restarted once closed, or once the HLog process dies. Each
+ * time the HLog starts, it must create a new log. This means that other
+ * systems should process the log appropriately upon each startup (and prior
+ * to initializing HLog).
*
- * We need to seize a lock on the writer so that writes are atomic.
+ * synchronized prevents appends during the completion of a cache flush or for
+ * the duration of a log roll.
+ *
* @param regionName
* @param tableName
* @param row
@@ -337,147 +378,133 @@
* @throws IOException
*/
synchronized void append(Text regionName, Text tableName, Text row,
- TreeMap${ROOTDIR}/log_HOST_PORT
+ * @param srcDir Directory of log files to split: e.g.
+ * ${ROOTDIR}/log_HOST_PORT
* @param fs FileSystem
* @param conf HBaseConfiguration
* @throws IOException
*/
static void splitLog(Path rootDir, Path srcDir, FileSystem fs,
- Configuration conf) throws IOException {
- Path logfiles[] = fs.listPaths(new Path[] {srcDir});
+ Configuration conf) throws IOException {
+ Path logfiles[] = fs.listPaths(new Path[] { srcDir });
LOG.info("splitting " + logfiles.length + " log(s) in " +
srcDir.toString());
HashMapdir location.
*
- * You should never have to load an existing log. If there is a log
- * at startup, it should have already been processed and deleted by
- * the time the HLog object is started up.
- *
+ * You should never have to load an existing log. If there is a log at
+ * startup, it should have already been processed and deleted by the time the
+ * HLog object is started up.
+ *
* @param fs
* @param dir
* @param conf
@@ -173,6 +206,7 @@
this.fs = fs;
this.dir = dir;
this.conf = conf;
+ this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
if (fs.exists(dir)) {
throw new IOException("Target HLog directory already exists: " + dir);
@@ -180,115 +214,117 @@
fs.mkdirs(dir);
rollWriter();
}
-
- synchronized void setSequenceNumber(long newvalue) {
- if (newvalue > logSeqNum) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("changing sequence number from " + logSeqNum + " to " +
- newvalue);
+
+ /**
+ * Called by HRegionServer when it opens a new region to ensure that log
+ * sequence numbers are always greater than the latest sequence number of the
+ * region being brought on-line.
+ *
+ * @param newvalue
+ */
+ void setSequenceNumber(long newvalue) {
+ synchronized (sequenceLock) {
+ if (newvalue > logSeqNum) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("changing sequence number from " + logSeqNum + " to " +
+ newvalue);
+ }
+ logSeqNum = newvalue;
}
- logSeqNum = newvalue;
}
}
/**
- * Roll the log writer. That is, start writing log messages to a new file.
+ * Roll the log writer. That is, start writing log messages to a new file.
*
- * The 'rollLock' prevents us from entering rollWriter() more than
- * once at a time.
+ * Because a log cannot be rolled during a cache flush, and a cache flush
+ * spans two method calls, a special lock needs to be obtained so that a cache
+ * flush cannot start when the log is being rolled and the log cannot be
+ * rolled during a cache flush.
*
- * The 'this' lock limits access to the current writer so
- * we don't append multiple items simultaneously.
- *
+ * Note that this method cannot be synchronized because it is possible that
+ * startCacheFlush runs, obtaining the cacheFlushLock, then this method could
+ * start which would obtain the lock on this but block on obtaining the
+ * cacheFlushLock and then completeCacheFlush could be called which would wait
+ * for the lock on this and consequently never release the cacheFlushLock
+ *
* @throws IOException
*/
- void rollWriter() throws IOException {
- synchronized(rollLock) {
+ synchronized void rollWriter() throws IOException {
+ boolean locked = false;
+ while (!locked && !closed) {
+ if (cacheFlushLock.tryLock()) {
+ locked = true;
+ break;
+ }
+ try {
+ this.wait(threadWakeFrequency);
+ } catch (InterruptedException e) {
+ }
+ }
+ if (closed) {
+ if (locked) {
+ cacheFlushLock.unlock();
+ }
+ throw new IOException("Cannot roll log; log is closed");
+ }
- // Try to roll the writer to a new file. We may have to
- // wait for a cache-flush to complete. In the process,
- // compute a list of old log files that can be deleted.
+ // If we get here we have locked out both cache flushes and appends
- Vectorstdout or split the specified log files.
+ *
* @param args
* @throws IOException
*/
@@ -490,7 +517,7 @@
if (args[0].compareTo("--dump") != 0) {
if (args[0].compareTo("--split") == 0) {
dump = false;
-
+
} else {
usage();
System.exit(-1);
@@ -499,7 +526,7 @@
Configuration conf = new HBaseConfiguration();
FileSystem fs = FileSystem.get(conf);
Path baseDir = new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR));
-
+
for (int i = 1; i < args.length; i++) {
Path logPath = new Path(args[i]);
if (!fs.exists(logPath)) {
@@ -513,7 +540,7 @@
try {
HLogKey key = new HLogKey();
HLogEdit val = new HLogEdit();
- while(log.next(key, val)) {
+ while (log.next(key, val)) {
System.out.println(key.toString() + " " + val.toString());
}
} finally {
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (revision 580788)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (working copy)
@@ -210,6 +210,7 @@
final int memcacheFlushSize;
final int blockingMemcacheSize;
protected final long threadWakeFrequency;
+ protected final int optionalFlushCount;
private final HLocking lock = new HLocking();
private long desiredMaxFileSize;
private final long maxSequenceId;
@@ -247,6 +248,8 @@
this.regionInfo = regionInfo;
this.memcache = new HMemcache();
this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
+ this.optionalFlushCount =
+ conf.getInt("hbase.hregion.memcache.optionalflushcount", 10);
// Declare the regionName. This is a unique string for the region, used to
// build a unique filename.
@@ -728,11 +731,13 @@
void optionallyFlush() throws IOException {
if(this.memcache.getSize() > this.memcacheFlushSize) {
flushcache(false);
- } else if (this.memcache.getSize() > 0 && this.noFlushCount >= 10) {
- LOG.info("Optional flush called " + this.noFlushCount +
- " times when data present without flushing. Forcing one.");
- flushcache(false);
- if (this.memcache.getSize() > 0) {
+ } else if (this.memcache.getSize() > 0) {
+ if (this.noFlushCount >= this.optionalFlushCount) {
+ LOG.info("Optional flush called " + this.noFlushCount +
+ " times when data present without flushing. Forcing one.");
+ flushcache(false);
+
+ } else {
// Only increment if something in the cache.
// Gets zero'd when a flushcache is called.
this.noFlushCount++;
@@ -864,25 +869,31 @@
retval.memcacheSnapshot.size());
}
- // A. Flush memcache to all the HStores.
- // Keep running vector of all store files that includes both old and the
- // just-made new flush store file.
- for (HStore hstore: stores.values()) {
- hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId);
+ try {
+ // A. Flush memcache to all the HStores.
+ // Keep running vector of all store files that includes both old and the
+ // just-made new flush store file.
+ for (HStore hstore: stores.values()) {
+ hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId);
+ }
+ } catch (IOException e) {
+ // An exception here means that the snapshot was not persisted.
+ // The hlog needs to be replayed so its content is restored to memcache.
+ // Currently, only a server restart will do this.
+ this.log.abortCacheFlush();
+ throw new DroppedSnapshotException(e.getMessage());
}
+ // If we get to here, the HStores have been written. If we get an
+ // error in completeCacheFlush it will release the lock it is holding
+
// B. Write a FLUSHCACHE-COMPLETE message to the log.
// This tells future readers that the HStores were emitted correctly,
// and that all updates to the log for this regionName that have lower
// log-sequence-ids can be safely ignored.
this.log.completeCacheFlush(this.regionInfo.regionName,
- regionInfo.tableDesc.getName(), logCacheFlushId);
- } catch (IOException e) {
- // An exception here means that the snapshot was not persisted.
- // The hlog needs to be replayed so its content is restored to memcache.
- // Currently, only a server restart will do this.
- this.log.abortCacheFlush();
- throw new DroppedSnapshotException(e.getMessage());
+ regionInfo.tableDesc.getName(), logCacheFlushId);
+
} finally {
// C. Delete the now-irrelevant memcache snapshot; its contents have been
// dumped to disk-based HStores or, if error, clear aborted snapshot.