Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java (revision 1403640) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java (revision ) @@ -356,6 +356,9 @@ throw new IOException("Unable to mkdir " + this.oldLogDir); } } + + coprocessorHost = new WALCoprocessorHost(this, conf); + // rollWriter sets this.hdfs_out if it can. rollWriter(); @@ -364,7 +367,6 @@ Threads.setDaemonThreadRunning(logSyncerThread.getThread(), Thread.currentThread().getName() + ".logSyncer"); - coprocessorHost = new WALCoprocessorHost(this, conf); } // use reflection to search for getDefaultBlockSize(Path f) @@ -532,6 +534,9 @@ } } + // Tell the coprocessors that a new log was created + coprocessorHost.postLogRoll(oldPath, newPath); + // Can we delete any of the old log files? if (this.outputfiles.size() > 0) { if (this.lastSeqWritten.isEmpty()) { @@ -716,6 +721,9 @@ i.postLogArchive(p, newPath); } } + + // Tell the coprocessors that a log has been archived. + coprocessorHost.postLogArchive(p, newPath); } /** @@ -764,6 +772,9 @@ i.postLogArchive(file.getPath(), p); } } + + // Tell the coprocessors that a log has been archived. + coprocessorHost.postLogArchive(file.getPath(), p); } LOG.debug("Moved " + files.length + " log files to " + FSUtils.getPath(this.oldLogDir)); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java (revision 1403640) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java (revision ) @@ -21,6 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; @@ -44,7 +45,7 @@ /** * Called before a {@link org.apache.hadoop.hbase.regionserver.wal.WALEdit} - * is writen to WAL. + * is written to WAL. * * @param ctx * @param info @@ -58,7 +59,7 @@ /** * Called after a {@link org.apache.hadoop.hbase.regionserver.wal.WALEdit} - * is writen to WAL. + * is written to WAL. * * @param ctx * @param info @@ -68,4 +69,23 @@ */ void postWALWrite(ObserverContext ctx, HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException; + + /** + * Called after the WAL has been rolled. The oldPath can be null if this is + * the first log file from the regionserver. + * + * @param oldPath the path to the old hlog + * @param newPath the path to the new hlog + */ + void postLogRoll(ObserverContext ctx, + Path oldPath, Path newPath) throws IOException; + + /** + * The WAL has been archived. + * + * @param oldPath the path to the old hlog + * @param newPath the path to the new hlog + */ + void postLogArchive(ObserverContext ctx, + Path oldPath, Path newPath) throws IOException; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java (revision 1403640) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java (revision ) @@ -22,6 +22,7 @@ import java.io.IOException; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.coprocessor.*; @@ -132,4 +133,37 @@ } } } + + public void postLogRoll(Path oldPath, Path newPath) + throws IOException { + ObserverContext ctx = null; + for (WALEnvironment env: coprocessors) { + if (env.getInstance() instanceof + org.apache.hadoop.hbase.coprocessor.WALObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + ((org.apache.hadoop.hbase.coprocessor.WALObserver)env.getInstance()). + postLogRoll(ctx, oldPath, newPath); + if (ctx.shouldComplete()) { + break; + } + } + } + } + + public void postLogArchive(Path oldPath, Path newPath) + throws IOException { + ObserverContext ctx = null; + for (WALEnvironment env: coprocessors) { + if (env.getInstance() instanceof + org.apache.hadoop.hbase.coprocessor.WALObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + ((org.apache.hadoop.hbase.coprocessor.WALObserver)env.getInstance()). + postLogArchive(ctx, oldPath, newPath); + if (ctx.shouldComplete()) { + break; + } + } + } + } + } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java (revision 1403640) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java (revision ) @@ -294,6 +294,25 @@ assertNotNull(getCoprocessor(log)); } + /** + * Test that newly created HLog is reported to WALObserver. + */ + @Test + public void testWALObserverPostLogRollNewHLog() throws Exception { + + HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(TEST_TABLE)); + + Path basedir = new Path(this.hbaseRootDir, Bytes.toString(TEST_TABLE)); + deleteDir(basedir); + fs.mkdirs(new Path(basedir, hri.getEncodedName())); + + HLog log = HLogFactory.createHLog(this.fs, hbaseRootDir, + TestWALObserver.class.getName(), this.conf); + SampleRegionWALObserver cp = getCoprocessor(log); + + assertTrue(cp.isPostLogRollCalled()); + } + private SampleRegionWALObserver getCoprocessor(HLog wal) throws Exception { WALCoprocessorHost host = wal.getCoprocessorHost(); Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class.getName()); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java (revision 1403640) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java (revision ) @@ -26,6 +26,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; @@ -36,7 +37,7 @@ * * It will monitor WAL writing and restoring, and modify passed-in WALEdit, i.e, * ignore specified columns when writing, or add a KeyValue. On the other - * side, it checks whether the ignored column is still in WAL when Restoreed + * side, it checks whether the ignored column is still in WAL when Restored * at region reconstruct. */ public class SampleRegionWALObserver extends BaseRegionObserver @@ -57,6 +58,8 @@ private boolean postWALWriteCalled = false; private boolean preWALRestoreCalled = false; private boolean postWALRestoreCalled = false; + private boolean postLogRollCalled = false; + private boolean postLogArchiveCalled = false; /** * Set values: with a table name, a column name which will be ignored, and @@ -118,6 +121,16 @@ return bypass; } + @Override + public void postLogRoll(ObserverContext ctx, Path oldPath, Path newPath) throws IOException { + postLogRollCalled = true; + } + + @Override + public void postLogArchive(ObserverContext ctx, Path oldPath, Path newPath) throws IOException { + postLogArchiveCalled = true; + } + /** * Triggered before {@link org.apache.hadoop.hbase.regionserver.HRegion} when WAL is * Restoreed. @@ -156,5 +169,17 @@ LOG.debug(SampleRegionWALObserver.class.getName() + ".isPostWALRestoreCalled is called."); return postWALRestoreCalled; + } + + public boolean isPostLogRollCalled() { + LOG.debug(SampleRegionWALObserver.class.getName() + + ".isPostLogRollCalled is called."); + return postLogRollCalled; + } + + public boolean isPostLogArchiveCalled() { + LOG.debug(SampleRegionWALObserver.class.getName() + + ".isPostLogArchiveCalled is called."); + return postLogArchiveCalled; } }