diff --git a/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 95edd9c..7502530 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -119,6 +119,11 @@ public class HLog implements HConstants, Syncable { private final int flushlogentries; private final AtomicInteger unflushedEntries = new AtomicInteger(0); private final Path oldLogDir; + private final List actionListeners = + new ArrayList(); + + private static Class logWriterClass; + private static Class logReaderClass; public interface Reader { @@ -238,20 +243,39 @@ public class HLog implements HConstants, Syncable { } /** + * HLog creating with a null actions listener. + * + * @param fs filesystem handle + * @param dir path to where hlogs are stored + * @param oldLogDir path to where hlogs are archived + * @param conf configuration to use + * @param listener listerner used to request log rolls + * @throws IOException + */ + public HLog(final FileSystem fs, final Path dir, final Path oldLogDir, + final Configuration conf, final LogRollListener listener) + throws IOException { + this(fs, dir, oldLogDir, conf, listener, null); + } + + /** * Create an edit log at the given dir location. * * You should never have to load an existing log. If there is a log at * startup, it should have already been processed and deleted by the time the * HLog object is started up. * - * @param fs - * @param dir - * @param conf - * @param listener + * @param fs filesystem handle + * @param dir path to where hlogs are stored + * @param oldLogDir path to where hlogs are archived + * @param conf configuration to use + * @param listener listerner used to request log rolls + * @param actionListener optional listener for hlog actions like archiving * @throws IOException */ public HLog(final FileSystem fs, final Path dir, final Path oldLogDir, - final Configuration conf, final LogRollListener listener) + final Configuration conf, final LogRollListener listener, + final LogActionsListener actionListener) throws IOException { super(); this.fs = fs; @@ -282,6 +306,9 @@ public class HLog implements HConstants, Syncable { ", enabled=" + this.enabled + ", flushlogentries=" + this.flushlogentries + ", optionallogflushinternal=" + this.optionalFlushInterval + "ms"); + if (actionListener != null) { + addLogActionsListerner(actionListener); + } rollWriter(); logSyncerThread = new LogSyncer(this.optionalFlushInterval); Threads.setDaemonThreadRunning(logSyncerThread, @@ -361,6 +388,12 @@ public class HLog implements HConstants, Syncable { ", filesize=" + this.fs.getFileStatus(oldFile).getLen() + ". ": "") + "New hlog " + FSUtils.getPath(newPath)); + // Tell our listeners that a new log was created + if (!this.actionListeners.isEmpty()) { + for (LogActionsListener list : this.actionListeners) { + list.logRolled(newPath); + } + } // Can we delete any of the old log files? if (this.outputfiles.size() > 0) { if (this.lastSeqWritten.size() <= 0) { @@ -397,9 +430,12 @@ public class HLog implements HConstants, Syncable { final Path path, Configuration conf) throws IOException { try { - Class c = Class.forName(conf.get("hbase.regionserver.hlog.reader.impl", + if (logReaderClass == null) { + logReaderClass = Class.forName(conf.get("hbase.regionserver.hlog.reader.impl", SequenceFileLogReader.class.getCanonicalName())); - HLog.Reader reader = (HLog.Reader) c.newInstance(); + } + + HLog.Reader reader = (HLog.Reader) logReaderClass.newInstance(); reader.init(fs, path, conf); return reader; } catch (Exception e) { @@ -421,9 +457,11 @@ public class HLog implements HConstants, Syncable { final Path path, Configuration conf) throws IOException { try { - Class c = Class.forName(conf.get("hbase.regionserver.hlog.writer.impl", + if (logWriterClass == null) { + logWriterClass = Class.forName(conf.get("hbase.regionserver.hlog.writer.impl", SequenceFileLogWriter.class.getCanonicalName())); - HLog.Writer writer = (HLog.Writer) c.newInstance(); + } + HLog.Writer writer = (HLog.Writer) logWriterClass.newInstance(); writer.init(fs, path, conf); return writer; } catch (Exception e) { @@ -557,6 +595,11 @@ public class HLog implements HConstants, Syncable { " whose highest sequence/edit id is " + seqno + " to " + FSUtils.getPath(newPath)); this.fs.rename(p, newPath); + if (!this.actionListeners.isEmpty()) { + for (LogActionsListener list : this.actionListeners) { + list.logArchived(p, newPath); + } + } } /** @@ -1384,8 +1427,6 @@ public class HLog implements HConstants, Syncable { return dirName.toString(); } - // We create a new file name with a ts in front of it to make sure we almost - // certainly don't have a file name conflict. private static Path getHLogArchivePath(Path oldLogDir, Path p) { return new Path(oldLogDir, System.currentTimeMillis() + "." + p.getName()); } @@ -1396,6 +1437,23 @@ public class HLog implements HConstants, Syncable { } /** + * + * @param list + */ + public void addLogActionsListerner(LogActionsListener list) { + LOG.info("Adding a listener"); + this.actionListeners.add(list); + } + + /** + * + * @param list + */ + public boolean removeLogActionsListener(LogActionsListener list) { + return this.actionListeners.remove(list); + } + + /** * Pass one or more log file names and it will either dump out a text version * on stdout or split the specified log files. * diff --git a/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogActionsListener.java b/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogActionsListener.java new file mode 100644 index 0000000..9b18992 --- /dev/null +++ b/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogActionsListener.java @@ -0,0 +1,43 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import org.apache.hadoop.fs.Path; + +/** + * Interface that defines all actions that can be listened to coming + * from the HLog. The calls are done in sync with what happens over in the + * HLog so make sure your implementation is fast. + */ +public interface LogActionsListener { + + /** + * Notify the listener that a new file is available + * @param newFile the path to the new hlog + */ + public void logRolled(Path newFile); + + /** + * Notify that the following log moved + * @param oldPath the old path + * @param newPath the new path + */ + public void logArchived(Path oldPath, Path newPath); +} diff --git a/core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogActionsListener.java b/core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogActionsListener.java new file mode 100644 index 0000000..378a39f --- /dev/null +++ b/core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogActionsListener.java @@ -0,0 +1,126 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.*; + +/** + * Test that the actions are called while playing with an HLog + */ +public class TestLogActionsListener { + + protected static final Log LOG = + LogFactory.getLog(TestLogActionsListener.class); + + private final static HBaseTestingUtility TEST_UTIL = + new HBaseTestingUtility(); + + private final static byte[] SOME_BYTES = Bytes.toBytes("t"); + private static FileSystem fs; + private static Path oldLogDir; + private static Path logDir; + private static Configuration conf; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf = TEST_UTIL.getConfiguration(); + conf.setInt("hbase.regionserver.maxlogs", 5); + fs = FileSystem.get(conf); + oldLogDir = new Path(TEST_UTIL.getTestDir(), + HConstants.HREGION_OLDLOGDIR_NAME); + logDir = new Path(TEST_UTIL.getTestDir(), + HConstants.HREGION_LOGDIR_NAME); + } + + @Before + public void setUp() throws Exception { + fs.delete(logDir, true); + fs.delete(oldLogDir, true); + } + + @After + public void tearDown() throws Exception { + setUp(); + } + + /** + * Add a bunch of dummy data and roll the logs every two insert. We + * should end up with 10 rolled files (plus the roll called in + * the constructor). Also test adding a listener while it's running. + */ + @Test + public void testActionListener() throws Exception { + DummyLogActionsListener list = new DummyLogActionsListener(); + DummyLogActionsListener laterList = new DummyLogActionsListener(); + HLog hlog = new HLog(fs, logDir, oldLogDir, conf, null, list); + HRegionInfo hri = new HRegionInfo(new HTableDescriptor(SOME_BYTES), + SOME_BYTES, SOME_BYTES, false); + + for (int i = 0; i < 20; i++) { + byte[] b = Bytes.toBytes(i+""); + KeyValue kv = new KeyValue(b,b,b); + WALEdit edit = new WALEdit(); + edit.add(kv); + HLogKey key = new HLogKey(b,b, 0, 0); + hlog.append(hri, key, edit, false); + if (i == 10) { + hlog.addLogActionsListerner(laterList); + } + if (i % 2 == 0) { + hlog.rollWriter(); + } + } + assertEquals(11, list.logRollCounter); + assertEquals(5, laterList.logRollCounter); + } + + /** + * Just counts when methods are called + */ + static class DummyLogActionsListener implements LogActionsListener { + + public int logRollCounter = 0; + + @Override + public void logRolled(Path newFile) { + logRollCounter++; + } + + @Override + public void logArchived(Path oldPath, Path newPath) { + // This one is a bit tricky to test since it involves seq numbers + } + } +}