Index: src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 1174401) +++ src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (working copy) @@ -802,6 +802,8 @@ k[1] = b2; k[2] = b3; Put put = new Put(k); + put.setAttribute(HConstants.WAL_META_ATTRIBUTE_NAME, + Bytes.toBytes("hlogmeta")); put.add(f, null, k); t.put(put); rowCount++; @@ -953,6 +955,8 @@ HRegionInfo hri = new HRegionInfo(table.getTableName(), startKeys[i], startKeys[j]); Put put = new Put(hri.getRegionName()); + put.setAttribute(HConstants.WAL_META_ATTRIBUTE_NAME, + Bytes.toBytes("hlogmeta")); put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER, Writables.getBytes(hri)); meta.put(put); @@ -1057,6 +1061,9 @@ HRegionInfo hri = new HRegionInfo(htd.getName(), startKeys[i], startKeys[j]); Put put = new Put(hri.getRegionName()); + put.setAttribute(HConstants.WAL_META_ATTRIBUTE_NAME, + Bytes.toBytes("hlogmeta")); + put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER, Writables.getBytes(hri)); meta.put(put); Index: src/test/java/org/apache/hadoop/hbase/TestHBaseTestingUtility.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/TestHBaseTestingUtility.java (revision 1174401) +++ src/test/java/org/apache/hadoop/hbase/TestHBaseTestingUtility.java (working copy) @@ -117,6 +117,8 @@ HTable table2 = htu2.createTable(TABLE_NAME, FAM_NAME); Put put = new Put(ROW); + put.setAttribute(HConstants.WAL_META_ATTRIBUTE_NAME, + Bytes.toBytes("hlogmeta")); put.add(FAM_NAME, QUAL_NAME, VALUE); table1.put(put); Index: src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java (revision 1174401) +++ src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java (working copy) @@ -248,6 +248,8 @@ HRegion region = createNewHRegion(desc, startKey, endKey); byte [] keyToWrite = startKey == null ? Bytes.toBytes("row_000") : startKey; Put put = new Put(keyToWrite); + put.setAttribute(HConstants.WAL_META_ATTRIBUTE_NAME, + Bytes.toBytes("hlogmeta")); put.add(FAMILY_NAME, null, Bytes.toBytes("test")); region.put(put); region.close(); Index: src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java (revision 1174401) +++ src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java (working copy) @@ -353,6 +353,11 @@ } else { put.add(split[0], split[1], t); } + // add a walmetadata with each operation. This is a + // negative test to ensure that no HBase operation + // breaks when wal metadata is present. + put.setAttribute(HConstants.WAL_META_ATTRIBUTE_NAME, + Bytes.toBytes("hlogmeta")); updater.put(put); count++; } catch (RuntimeException ex) { Index: src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogMetadataWALObserver.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogMetadataWALObserver.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogMetadataWALObserver.java (revision 0) @@ -0,0 +1,160 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.IOException; +import java.util.List; +import java.util.Arrays; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.WALObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.WALCoprocessorEnvironment; + +/** + * Class for testing metadata values in HLog. + */ +public class HLogMetadataWALObserver extends BaseRegionObserver +implements WALObserver { + + private static final Log LOG = LogFactory.getLog(HLogMetadataWALObserver.class); + + private byte[] tableName; + private byte[] row; + + private boolean preWALWriteCalled = false; + private boolean postWALWriteCalled = false; + private boolean preWALRestoreCalled = false; + private boolean postWALRestoreCalled = false; + + private byte[] lastMetaBuffer = null; + private int numMetaBuffers = 0; + + /** + * Set values: with a table name, a column name which will be ignored, and + * a column name which will be added to WAL. + */ + public void setTestValues(byte[] tableName, byte[] row, byte[] igf, byte[] igq, + byte[] chf, byte[] chq, byte[] addf, byte[] addq) { + this.row = row; + this.tableName = tableName; + } + + @Override + public void postWALWrite(ObserverContext env, + HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException { + postWALWriteCalled = true; + + // check table name matches or not. + if (!Arrays.equals(HRegionInfo.getTableName(info.getRegionName()), this.tableName)) { + return; + } + preWALWriteCalled = true; + // scan all the entris in this transaction + List kvs = logEdit.getKeyValues(); + KeyValue deletedKV = null; + for (KeyValue kv : kvs) { + byte[] family = kv.getFamily(); + byte[] qualifier = kv.getQualifier(); + if (HLog.isMetaRow(kv)) { + lastMetaBuffer = kv.getValue(); + numMetaBuffers++; + } + } + } + + /** + * Returns the HLogMetadata that was last inserted into the HLog. + */ + byte[] getLastMetaBuffer() { + return lastMetaBuffer; + } + + /** + * Returns the number of metadata entries encountered so far. + */ + int getNumMetaBuffer() { + return numMetaBuffers; + } + + /** + * Resets HLogMetadata + */ + void cleanLastMetaBuffer() { + lastMetaBuffer = null; + numMetaBuffers = 0; + preWALWriteCalled = false; + postWALWriteCalled = false; + preWALRestoreCalled = false; + postWALRestoreCalled = false; + } + + @Override + public boolean preWALWrite(ObserverContext env, + HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException { + preWALWriteCalled = true; + return false; + } + + /** + * Triggered before {@link org.apache.hadoop.hbase.regionserver.HRegion} when WAL is + * Restoreed. + */ + @Override + public void preWALRestore(ObserverContext env, + HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException { + preWALRestoreCalled = true; + } + + /** + * Triggered after {@link org.apache.hadoop.hbase.regionserver.HRegion} when WAL is + * Restoreed. + */ + @Override + public void postWALRestore(ObserverContext env, + HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException { + postWALRestoreCalled = true; + } + + boolean isPreWALWriteCalled() { + return preWALWriteCalled; + } + + boolean isPostWALWriteCalled() { + return postWALWriteCalled; + } + + boolean isPreWALRestoreCalled() { + return preWALRestoreCalled; + } + + boolean isPostWALRestoreCalled() { + return postWALRestoreCalled; + } +} Index: src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMetadata.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMetadata.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMetadata.java (revision 0) @@ -0,0 +1,304 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotNull; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.LeaseManager; +import org.apache.hadoop.io.SequenceFile; +import org.apache.log4j.Level; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** JUnit test case for HLog */ +public class TestHLogMetadata { + private static final Log LOG = LogFactory.getLog(TestHLogMetadata.class); + { + ((Log4JLogger)HLog.LOG).getLogger().setLevel(Level.ALL); + } + + private static Configuration conf; + private static FileSystem fs; + private static Path dir; + private static MiniDFSCluster cluster; + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static Path hbaseDir; + private static Path oldLogDir; + private static HBaseAdmin admin; + private static HRegionServer server; + + private final byte [] rowName = Bytes.toBytes("row"); + + @Before + public void setUp() throws Exception { + + FileStatus[] entries = fs.listStatus(new Path("/")); + for (FileStatus dir : entries) { + fs.delete(dir.getPath(), true); + } + + } + + @After + public void tearDown() throws Exception { + } + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // Make block sizes small. + TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024); + // needed for testAppendClose() + TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true); + // quicker heartbeat interval for faster DN death notification + TEST_UTIL.getConfiguration().setInt("heartbeat.recheck.interval", 5000); + TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1); + TEST_UTIL.getConfiguration().setInt("dfs.socket.timeout", 5000); + // faster failover with cluster.shutdown();fs.close() idiom + TEST_UTIL.getConfiguration() + .setInt("ipc.client.connect.max.retries", 1); + TEST_UTIL.getConfiguration().setInt( + "dfs.client.block.recovery.retries", 1); + TEST_UTIL.getConfiguration().setInt( + "ipc.client.connection.maxidletime", 500); + TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30); + TEST_UTIL.getConfiguration().setInt( + "hbase.regionserver.hlog.tolerable.lowreplication", 2); + TEST_UTIL.getConfiguration().setInt( + "hbase.regionserver.hlog.lowreplication.rolllimit", 3); + TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, + HLogMetadataWALObserver.class.getName()); + TEST_UTIL.startMiniCluster(1); + + conf = TEST_UTIL.getConfiguration(); + cluster = TEST_UTIL.getDFSCluster(); + fs = cluster.getFileSystem(); + admin = TEST_UTIL.getHBaseAdmin(); + + hbaseDir = new Path(TEST_UTIL.getConfiguration().get("hbase.rootdir")); + oldLogDir = new Path(hbaseDir, ".oldlogs"); + dir = new Path(hbaseDir, getName()); + } + + private HLogMetadataWALObserver getCoprocessor(HLog wal) throws Exception { + WALCoprocessorHost host = wal.getCoprocessorHost(); + Coprocessor c = host.findCoprocessor(HLogMetadataWALObserver.class.getName()); + return (HLogMetadataWALObserver)c; + } + + private static String getName() { + return "TestHLogMetadata"; + } + + /** + * Write a record and verify that the metadata appears in the HLog. + * @throws IOException + */ + @Test + public void testHLogMetadata() throws Exception { + + LOG.info("Running testPutMetadata"); + + // Create the test table and open it + final byte [] tableName = Bytes.toBytes(getName()); + HTableDescriptor desc = new HTableDescriptor(getName()); + desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); + admin.createTable(desc); + HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName); + + // initialize coprocessor to filter specified table. + HRegionServer server = TEST_UTIL.getRSForFirstRegionInTable(tableName); + HLog log = server.getWAL(); + HLogMetadataWALObserver cp = getCoprocessor(log); + cp.setTestValues(tableName, rowName, null, null, null, null, null, null); + + doPutTest(table, cp); + doDeleteTest(table, cp); + doCheckAndPut(table, cp); + } + + /** + * Write a record and verify that the metadata appears in the HLog. + * @throws IOException + */ + private void doPutTest(HTable table, HLogMetadataWALObserver cp) + throws Exception { + + LOG.info("Running doPutTest"); + + // insert a record with hlog metadata + cp.cleanLastMetaBuffer(); + Put put = new Put(rowName); + put.add(HConstants.CATALOG_FAMILY, null, Bytes.toBytes("test")); + put.setAttribute(HConstants.WAL_META_ATTRIBUTE_NAME, + Bytes.toBytes("hlogmeta")); + table.put(put); + + // verify that record appears in the HLog + assertTrue(cp.isPostWALWriteCalled()); + assertEquals(1, cp.getNumMetaBuffer()); + assertTrue(Bytes.BYTES_COMPARATOR.compare( + cp.getLastMetaBuffer(), Bytes.toBytes("hlogmeta")) == 0); + + // insert a record with no hlog metadata + cp.cleanLastMetaBuffer(); + put = new Put(rowName); + put.add(HConstants.CATALOG_FAMILY, null, Bytes.toBytes("test1")); + table.put(put); + + // verify that metedata does not appear in the HLog. + assertTrue(cp.isPostWALWriteCalled()); + assertEquals(0, cp.getNumMetaBuffer()); + } + + /** + * Delete a record and verify that the metadata appears in the HLog. + * @throws IOException + */ + private void doDeleteTest(HTable table, HLogMetadataWALObserver cp) + throws Exception { + + LOG.info("Running doDeleteTest"); + + // insert a record with no hlog metadata + cp.cleanLastMetaBuffer(); + Put put = new Put(rowName); + put.add(HConstants.CATALOG_FAMILY, null, Bytes.toBytes("test")); + table.put(put); + assertTrue(cp.isPostWALWriteCalled()); + assertEquals(0, cp.getNumMetaBuffer()); + + // insert a delete record with metadata + cp.cleanLastMetaBuffer(); + Delete delete = new Delete(rowName); + delete.setAttribute(HConstants.WAL_META_ATTRIBUTE_NAME, + Bytes.toBytes("hlogdeletemeta")); + table.delete(delete); + + // verify that delete record metedata appears in the HLog + assertTrue(cp.isPostWALWriteCalled()); + assertEquals(1, cp.getNumMetaBuffer()); + assertTrue(Bytes.BYTES_COMPARATOR.compare( + cp.getLastMetaBuffer(), Bytes.toBytes("hlogdeletemeta")) == 0); + } + + /** + * Do a checkAndPut and verify that the metadata appears in the HLog. + * @throws IOException + */ + private void doCheckAndPut(HTable table, HLogMetadataWALObserver cp) + throws Exception { + + LOG.info("Running doCheckAndPut"); + byte[] col = Bytes.toBytes("col"); + + // insert a record with no hlog metadata + cp.cleanLastMetaBuffer(); + Put put = new Put(rowName); + put.add(HConstants.CATALOG_FAMILY, col, Bytes.toBytes("test1")); + table.put(put); + assertTrue(cp.isPostWALWriteCalled()); + assertEquals(0, cp.getNumMetaBuffer()); + + // insert a mutate record with metadata + cp.cleanLastMetaBuffer(); + put = new Put(rowName); + put.add(HConstants.CATALOG_FAMILY, col, Bytes.toBytes("test2")); + put.setAttribute(HConstants.WAL_META_ATTRIBUTE_NAME, + Bytes.toBytes("log1")); + boolean done = table.checkAndPut(rowName, HConstants.CATALOG_FAMILY, + col, Bytes.toBytes("test1"), put); + assertTrue(done); + + // verify that put metedata appears in the HLog + assertTrue(cp.isPostWALWriteCalled()); + assertEquals(1, cp.getNumMetaBuffer()); + assertTrue(Bytes.BYTES_COMPARATOR.compare( + cp.getLastMetaBuffer(), Bytes.toBytes("log1")) == 0); + } + + /** + * Do a checkAndDelete and verify that the metadata appears in the HLog. + * @throws IOException + */ + private void doCheckAndDelete(HTable table, HLogMetadataWALObserver cp) + throws Exception { + + LOG.info("Running doCheckAndDelete"); + byte[] col = Bytes.toBytes("col"); + + // insert a record with no hlog metadata + cp.cleanLastMetaBuffer(); + Put put = new Put(rowName); + put.add(HConstants.CATALOG_FAMILY, col, Bytes.toBytes("hello")); + table.put(put); + assertTrue(cp.isPostWALWriteCalled()); + assertEquals(0, cp.getNumMetaBuffer()); + + // insert a mutate record with metadata + cp.cleanLastMetaBuffer(); + Delete del = new Delete(rowName); + del.setAttribute(HConstants.WAL_META_ATTRIBUTE_NAME, + Bytes.toBytes("log10")); + boolean done = table.checkAndDelete(rowName, HConstants.CATALOG_FAMILY, + col, Bytes.toBytes("hello"), del); + assertTrue(done); + + // verify that checkAndDelete metedata appears in the HLog + assertTrue(cp.isPostWALWriteCalled()); + assertEquals(1, cp.getNumMetaBuffer()); + assertTrue(Bytes.BYTES_COMPARATOR.compare( + cp.getLastMetaBuffer(), Bytes.toBytes("log10")) == 0); + } +} Index: src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (revision 1174401) +++ src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (working copy) @@ -407,6 +407,8 @@ .toBytes(value)); put.add(Bytes.toBytes("trans-group"), null, Bytes .toBytes("adhocTransactionGroupId")); + put.setAttribute(HConstants.WAL_META_ATTRIBUTE_NAME, + Bytes.toBytes("hlogmeta")); ht.put(put); } } @@ -1648,6 +1650,8 @@ delete = new Delete(ROW); delete.deleteColumn(FAMILIES[0], QUALIFIER); // ts[4] + delete.setAttribute(HConstants.WAL_META_ATTRIBUTE_NAME, + Bytes.toBytes("hlogmeta")); ht.delete(delete); get = new Get(ROW); @@ -3330,10 +3334,14 @@ put = new Put(row); put.add(FAMILY, qualifier, 2L, Bytes.toBytes("BBB")); + put.setAttribute(HConstants.WAL_META_ATTRIBUTE_NAME, + Bytes.toBytes("hlogmeta1")); hTable.put(put); put = new Put(row); put.add(FAMILY, qualifier, 3L, Bytes.toBytes("EEE")); + put.setAttribute(HConstants.WAL_META_ATTRIBUTE_NAME, + Bytes.toBytes("hlogmeta2")); hTable.put(put); Get get = new Get(row); @@ -4161,6 +4169,8 @@ HTable table = TEST_UTIL.createTable(Bytes.toBytes("testCheckAndPut"), new byte [][] {FAMILY}); Put put1 = new Put(ROW); + put1.setAttribute(HConstants.WAL_META_ATTRIBUTE_NAME, + Bytes.toBytes("hlogmeta")); put1.add(FAMILY, QUALIFIER, VALUE); // row doesn't exist, so using non-null value should be considered "not match". Index: src/test/java/org/apache/hadoop/hbase/client/TestHTableUtil.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/TestHTableUtil.java (revision 1174401) +++ src/test/java/org/apache/hadoop/hbase/client/TestHTableUtil.java (working copy) @@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.HConstants; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -91,6 +92,8 @@ private Put createPut(String row) { Put put = new Put( Bytes.toBytes(row)); put.add(FAMILY, QUALIFIER, VALUE); + put.setAttribute(HConstants.WAL_META_ATTRIBUTE_NAME, + Bytes.toBytes("hlogmeta")); return put; } Index: src/main/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/HConstants.java (revision 1174401) +++ src/main/java/org/apache/hadoop/hbase/HConstants.java (working copy) @@ -512,6 +512,12 @@ "(" + CP_HTD_ATTR_VALUE_PARAM_KEY_PATTERN + ")=(" + CP_HTD_ATTR_VALUE_PARAM_VALUE_PATTERN + "),?"); + /** + * The name of the attribute that can be passed via Put call. The value + * of this attribute is atomically stored in the wal. + */ + public static final String WAL_META_ATTRIBUTE_NAME = "_hbaseWalMetadata"; + private HConstants() { // Can't be instantiated with this ctor. } Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (revision 1174401) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (working copy) @@ -112,6 +112,9 @@ static final Log LOG = LogFactory.getLog(HLog.class); public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY"); static final byte [] METAROW = Bytes.toBytes("METAROW"); + public static final byte [] METACOL = Bytes.toBytes( + HConstants.WAL_META_ATTRIBUTE_NAME); + static final KeyValue METAKV = new KeyValue(METAROW, METAFAMILY, METACOL); /* * Name of directory that holds recovered edits written by the wal log @@ -1341,6 +1344,14 @@ } /** + * Return true if this kv has rowkey as METAROW and + * column family as METAFAMILY + */ + static public boolean isMetaRow(KeyValue kv) { + return KeyValue.COMPARATOR.matchingRowColumn(kv, METAKV); + } + + /** * Abort a cache flush. * Call if the flush fails. Note that the only recovery for an aborted flush * currently is a restart of the regionserver so the snapshot content dropped @@ -1369,6 +1380,19 @@ } /** + * Add an application specified blob to the waledit. + * This record will be ignored by the codepath that + * re-ingests recovered edits. + */ + static public void addWalMetadata(WALEdit e, byte[] blob) { + if (blob != null) { + KeyValue kv = new KeyValue(METAROW, METAFAMILY, METACOL, + System.currentTimeMillis(), blob); + e.add(kv); + } + } + + /** * @param family * @return true if the column is a meta column */ Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1174401) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -1402,7 +1402,8 @@ try { // All edits for the given row (across all column families) must happen atomically. prepareDelete(delete); - delete(delete.getFamilyMap(), delete.getClusterId(), writeToWAL); + delete(delete.getFamilyMap(), delete.getClusterId(), writeToWAL, + delete.getAttribute(HConstants.WAL_META_ATTRIBUTE_NAME)); } finally { if(lockid == null) releaseRowLock(lid); } @@ -1413,11 +1414,23 @@ /** * @param familyMap map of family to edits for the given family. + * Used only for unit tests. * @param writeToWAL * @throws IOException */ + void delete(Map> familyMap, UUID clusterId, + boolean writeToWAL) throws IOException { + delete(familyMap, clusterId, writeToWAL, null); // walmetadata is null + } + + + /** + * @param familyMap map of family to edits for the given family. + * @param writeToWAL + * @throws IOException + */ public void delete(Map> familyMap, UUID clusterId, - boolean writeToWAL) throws IOException { + boolean writeToWAL, byte[] walMetadata) throws IOException { /* Run coprocessor pre hook outside of locks to avoid deadlock */ if (coprocessorHost != null) { if (coprocessorHost.preDelete(familyMap, writeToWAL)) { @@ -1485,6 +1498,7 @@ // bunch up all edits across all column families into a // single WALEdit. WALEdit walEdit = new WALEdit(); + HLog.addWalMetadata(walEdit, walMetadata); addFamilyMapToWALEdit(familyMap, walEdit); this.log.append(regionInfo, this.htableDescriptor.getName(), walEdit, clusterId, now, this.htableDescriptor); @@ -1562,7 +1576,8 @@ try { // All edits for the given row (across all column families) must happen atomically. // Coprocessor interception happens in put(Map,boolean) - put(put.getFamilyMap(), put.getClusterId(), writeToWAL); + put(put.getFamilyMap(), put.getClusterId(), writeToWAL, + put.getAttribute(HConstants.WAL_META_ATTRIBUTE_NAME)); } finally { if(lockid == null) releaseRowLock(lid); } @@ -1755,6 +1770,8 @@ Put p = batchOp.operations[i].getFirst(); if (!p.getWriteToWAL()) continue; addFamilyMapToWALEdit(familyMaps[i], walEdit); + HLog.addWalMetadata(walEdit, + p.getAttribute(HConstants.WAL_META_ATTRIBUTE_NAME)); } // Append the edit to WAL @@ -1897,11 +1914,14 @@ // originating cluster. A slave cluster receives the result as a Put // or Delete if (isPut) { - put(((Put)w).getFamilyMap(), HConstants.DEFAULT_CLUSTER_ID, writeToWAL); + Put p = (Put)w; + put(p.getFamilyMap(), HConstants.DEFAULT_CLUSTER_ID, writeToWAL, + p.getAttribute(HConstants.WAL_META_ATTRIBUTE_NAME)); } else { Delete d = (Delete)w; prepareDelete(d); - delete(d.getFamilyMap(), HConstants.DEFAULT_CLUSTER_ID, writeToWAL); + delete(d.getFamilyMap(), HConstants.DEFAULT_CLUSTER_ID, writeToWAL, + d.getAttribute(HConstants.WAL_META_ATTRIBUTE_NAME)); } return true; } @@ -1992,7 +2012,9 @@ familyMap = new HashMap>(); familyMap.put(family, edits); - this.put(familyMap, HConstants.DEFAULT_CLUSTER_ID, true); + // the walMetadata is null because this method is used to + // update META tables only. + this.put(familyMap, HConstants.DEFAULT_CLUSTER_ID, true, null); } /** @@ -2003,7 +2025,7 @@ * @throws IOException */ private void put(Map> familyMap, UUID clusterId, - boolean writeToWAL) throws IOException { + boolean writeToWAL, byte[] walMetadata) throws IOException { /* run pre put hook outside of lock to avoid deadlock */ if (coprocessorHost != null) { if (coprocessorHost.prePut(familyMap, writeToWAL)) { @@ -2026,6 +2048,7 @@ // will contain uncommitted transactions. if (writeToWAL) { WALEdit walEdit = new WALEdit(); + HLog.addWalMetadata(walEdit,walMetadata); addFamilyMapToWALEdit(familyMap, walEdit); this.log.append(regionInfo, this.htableDescriptor.getName(), walEdit, clusterId, now, this.htableDescriptor);