Index: core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogActionsListener.java =================================================================== --- core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogActionsListener.java (revision 0) +++ core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogActionsListener.java (revision 0) @@ -0,0 +1,126 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.*; + +/** + * Test that the actions are called while playing with an HLog + */ +public class TestLogActionsListener { + + protected static final Log LOG = + LogFactory.getLog(TestLogActionsListener.class); + + private final static HBaseTestingUtility TEST_UTIL = + new HBaseTestingUtility(); + + private final static byte[] SOME_BYTES = Bytes.toBytes("t"); + private static FileSystem fs; + private static Path oldLogDir; + private static Path logDir; + private static Configuration conf; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf = TEST_UTIL.getConfiguration(); + conf.setInt("hbase.regionserver.maxlogs", 5); + fs = FileSystem.get(conf); + oldLogDir = new Path(TEST_UTIL.getTestDir(), + HConstants.HREGION_OLDLOGDIR_NAME); + logDir = new Path(TEST_UTIL.getTestDir(), + HConstants.HREGION_LOGDIR_NAME); + } + + @Before + public void setUp() throws Exception { + fs.delete(logDir, true); + fs.delete(oldLogDir, true); + } + + @After + public void tearDown() throws Exception { + setUp(); + } + + /** + * Add a bunch of dummy data and roll the logs every two insert. We + * should end up with 10 rolled files (plus the roll called in + * the constructor). Also test adding a listener while it's running. + */ + @Test + public void testActionListener() throws Exception { + DummyLogActionsListener list = new DummyLogActionsListener(); + DummyLogActionsListener laterList = new DummyLogActionsListener(); + HLog hlog = new HLog(fs, logDir, oldLogDir, conf, null, list); + HRegionInfo hri = new HRegionInfo(new HTableDescriptor(SOME_BYTES), + SOME_BYTES, SOME_BYTES, false); + + for (int i = 0; i < 20; i++) { + byte[] b = Bytes.toBytes(i+""); + KeyValue kv = new KeyValue(b,b,b); + WALEdit edit = new WALEdit(); + edit.add(kv); + HLogKey key = new HLogKey(b,b, 0, 0); + hlog.append(hri, key, edit); + if (i == 10) { + hlog.addLogActionsListerner(laterList); + } + if (i % 2 == 0) { + hlog.rollWriter(); + } + } + assertEquals(11, list.logRollCounter); + assertEquals(5, laterList.logRollCounter); + } + + /** + * Just counts when methods are called + */ + static class DummyLogActionsListener implements LogActionsListener { + + public int logRollCounter = 0; + + @Override + public void logRolled(Path newFile) { + logRollCounter++; + } + + @Override + public void logArchived(Path oldPath, Path newPath) { + // This one is a bit tricky to test since it involves seq numbers + } + } +} Index: core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogActionsListener.java =================================================================== --- core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogActionsListener.java (revision 0) +++ core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogActionsListener.java (revision 0) @@ -0,0 +1,43 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import org.apache.hadoop.fs.Path; + +/** + * Interface that defines all actions that can be listened to coming + * from the HLog. The calls are done in sync with what happens over in the + * HLog so make sure your implementation is fast. + */ +public interface LogActionsListener { + + /** + * Notify the listener that a new file is available + * @param newFile the path to the new hlog + */ + public void logRolled(Path newFile); + + /** + * Notify that the following log moved + * @param oldPath the old path + * @param newPath the new path + */ + public void logArchived(Path oldPath, Path newPath); +} Index: core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java =================================================================== --- core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (revision 945962) +++ core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (working copy) @@ -125,7 +125,12 @@ private final int flushlogentries; private final AtomicInteger unflushedEntries = new AtomicInteger(0); private final Path oldLogDir; + private final List actionListeners = + new ArrayList(); + private static Class logWriterClass; + private static Class logReaderClass; + private OutputStream hdfs_out; // OutputStream associated with the current SequenceFile.writer private int initialReplication; // initial replication factor of SequenceFile.writer private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas @@ -245,20 +250,39 @@ } /** + * HLog creating with a null actions listener. + * + * @param fs filesystem handle + * @param dir path to where hlogs are stored + * @param oldLogDir path to where hlogs are archived + * @param conf configuration to use + * @param listener listerner used to request log rolls + * @throws IOException + */ + public HLog(final FileSystem fs, final Path dir, final Path oldLogDir, + final Configuration conf, final LogRollListener listener) + throws IOException { + this(fs, dir, oldLogDir, conf, listener, null); + } + + /** * Create an edit log at the given dir location. * * You should never have to load an existing log. If there is a log at * startup, it should have already been processed and deleted by the time the * HLog object is started up. * - * @param fs - * @param dir - * @param conf - * @param listener + * @param fs filesystem handle + * @param dir path to where hlogs are stored + * @param oldLogDir path to where hlogs are archived + * @param conf configuration to use + * @param listener listerner used to request log rolls + * @param actionListener optional listener for hlog actions like archiving * @throws IOException */ public HLog(final FileSystem fs, final Path dir, final Path oldLogDir, - final Configuration conf, final LogRollListener listener) + final Configuration conf, final LogRollListener listener, + final LogActionsListener actionListener) throws IOException { super(); this.fs = fs; @@ -289,6 +313,9 @@ ", enabled=" + this.enabled + ", flushlogentries=" + this.flushlogentries + ", optionallogflushinternal=" + this.optionalFlushInterval + "ms"); + if (actionListener != null) { + addLogActionsListerner(actionListener); + } // rollWriter sets this.hdfs_out if it can. rollWriter(); @@ -406,6 +433,12 @@ ", filesize=" + this.fs.getFileStatus(oldFile).getLen() + ". ": "") + "New hlog " + FSUtils.getPath(newPath)); + // Tell our listeners that a new log was created + if (!this.actionListeners.isEmpty()) { + for (LogActionsListener list : this.actionListeners) { + list.logRolled(newPath); + } + } // Can we delete any of the old log files? if (this.outputfiles.size() > 0) { if (this.lastSeqWritten.size() <= 0) { @@ -442,9 +475,12 @@ final Path path, Configuration conf) throws IOException { try { - Class c = Class.forName(conf.get("hbase.regionserver.hlog.reader.impl", + if (logReaderClass == null) { + logReaderClass = Class.forName(conf.get("hbase.regionserver.hlog.reader.impl", SequenceFileLogReader.class.getCanonicalName())); - HLog.Reader reader = (HLog.Reader) c.newInstance(); + } + + HLog.Reader reader = (HLog.Reader) logReaderClass.newInstance(); reader.init(fs, path, conf); return reader; } catch (Exception e) { @@ -466,9 +502,11 @@ final Path path, Configuration conf) throws IOException { try { - Class c = Class.forName(conf.get("hbase.regionserver.hlog.writer.impl", + if (logWriterClass == null) { + logWriterClass = Class.forName(conf.get("hbase.regionserver.hlog.writer.impl", SequenceFileLogWriter.class.getCanonicalName())); - HLog.Writer writer = (HLog.Writer) c.newInstance(); + } + HLog.Writer writer = (HLog.Writer) logWriterClass.newInstance(); writer.init(fs, path, conf); return writer; } catch (Exception e) { @@ -602,6 +640,11 @@ " whose highest sequence/edit id is " + seqno + " to " + FSUtils.getPath(newPath)); this.fs.rename(p, newPath); + if (!this.actionListeners.isEmpty()) { + for (LogActionsListener list : this.actionListeners) { + list.logArchived(p, newPath); + } + } } /** @@ -1621,8 +1664,6 @@ return dirName.toString(); } - // We create a new file name with a ts in front of it to make sure we almost - // certainly don't have a file name conflict. private static Path getHLogArchivePath(Path oldLogDir, Path p) { return new Path(oldLogDir, System.currentTimeMillis() + "." + p.getName()); } @@ -1633,6 +1674,23 @@ } /** + * + * @param list + */ + public void addLogActionsListerner(LogActionsListener list) { + LOG.info("Adding a listener"); + this.actionListeners.add(list); + } + + /** + * + * @param list + */ + public boolean removeLogActionsListener(LogActionsListener list) { + return this.actionListeners.remove(list); + } + + /** * Pass one or more log file names and it will either dump out a text version * on stdout or split the specified log files. * Index: core/src/main/java/org/apache/hadoop/hbase/master/LogCleanerDelegate.java =================================================================== --- core/src/main/java/org/apache/hadoop/hbase/master/LogCleanerDelegate.java (revision 945963) +++ core/src/main/java/org/apache/hadoop/hbase/master/LogCleanerDelegate.java (working copy) @@ -38,3 +38,4 @@ */ public boolean isLogDeletable(Path filePath); } +