diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 4e362c7..a8bbec9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -77,7 +77,7 @@ public abstract class Compactor { protected final Compression.Algorithm compactionCompression; /** specify how many days to keep MVCC values during major compaction **/ - protected final int keepSeqIdPeriod; + protected int keepSeqIdPeriod; //TODO: depending on Store is not good but, realistically, all compactors currently do. Compactor(final Configuration conf, final Store store) { @@ -419,8 +419,12 @@ public abstract class Compactor { now = EnvironmentEdgeManager.currentTime(); } // output to writer: + Cell lastCleanCell = null; + long lastCleanCellSeqId = 0; for (Cell c : cells) { if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) { + lastCleanCell = c; + lastCleanCellSeqId = c.getSequenceId(); CellUtil.setSequenceId(c, 0); } writer.append(c); @@ -457,6 +461,10 @@ public abstract class Compactor { bytesWrittenProgressForShippedCall = 0; } } + if (lastCleanCell != null) { + // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly + CellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId); + } // Log the progress of long running compactions every minute if // logging at DEBUG level if (LOG.isDebugEnabled()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestSeqId.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestSeqId.java new file mode 100644 index 0000000..53375df --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestSeqId.java @@ -0,0 +1,139 @@ +package org.apache.hadoop.hbase.regionserver.compactions; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestSeqId { + private static final Log LOG = LogFactory.getLog(TestSeqId.class); + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static byte[] ROW = Bytes.toBytes("row"); + private static byte[] FAMILY = Bytes.toBytes("cf"); + private static byte[] QUALIFIER = Bytes.toBytes("qualifier"); + + public static class DummyCompactor extends DefaultCompactor { + public DummyCompactor(Configuration conf, Store store) { + super(conf, store); + this.keepSeqIdPeriod = 0; + } + } + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY, DummyCompactor.class.getName()); + conf.set("hbase.hstore.compaction.kv.max", "10"); + TEST_UTIL.startMiniCluster(3); + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * @throws java.lang.Exception + */ + @Before + public void setUp() throws Exception { + // Nothing to do. + } + + /** + * @throws java.lang.Exception + */ + @After + public void tearDown() throws Exception { + // Nothing to do. + } + + @Test + public void testCompactionSeqId() throws Exception { + /* + First write a qualifier of ten versions with different timestamp and flush to a hfile hfile1, + second write a qualifier of ten versions with different timestamp and flush to another hfile hfile2. + The first cell of the qualifier in the hfile1 and the last cell of the qualifier in the hfile2 are same timestamp only different seqid. + The compaction will first choose the ten versions of the qualifier in the hfile2 and then the ten versions of the qualifier in the hfile1. + When compaction write out ten versions of the qualifier in the hfile2 will set seqId to 0. + */ + TableName TABLE = TableName.valueOf("TestSeqId"); + HTableDescriptor desc = new HTableDescriptor(TABLE); + HColumnDescriptor hcd = new HColumnDescriptor(FAMILY); + hcd.setMaxVersions(65536); + desc.addFamily(hcd); + TEST_UTIL.getHBaseAdmin().createTable(desc); + Table table = TEST_UTIL.getConnection().getTable(TABLE); + + long timestamp = 10000; + /* + row1/cf:a/10009/Put/vlen=2/seqid=13 V: v9 + row1/cf:a/10008/Put/vlen=2/seqid=12 V: v8 + row1/cf:a/10007/Put/vlen=2/seqid=11 V: v7 + row1/cf:a/10006/Put/vlen=2/seqid=10 V: v6 + row1/cf:a/10005/Put/vlen=2/seqid=9 V: v5 + row1/cf:a/10004/Put/vlen=2/seqid=8 V: v4 + row1/cf:a/10003/Put/vlen=2/seqid=7 V: v3 + row1/cf:a/10002/Put/vlen=2/seqid=6 V: v2 + row1/cf:a/10001/Put/vlen=2/seqid=5 V: v1 + row1/cf:a/10000/Put/vlen=2/seqid=4 V: v0 + */ + for (int i = 0; i < 10; i++) { + Put put = new Put(ROW); + put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i)); + table.put(put); + } + TEST_UTIL.getHBaseAdmin().flush(TABLE); + + /* + row1/cf:a/10018/Put/vlen=3/seqid=18 V: v18 + row1/cf:a/10017/Put/vlen=3/seqid=19 V: v17 + row1/cf:a/10016/Put/vlen=3/seqid=20 V: v16 + row1/cf:a/10015/Put/vlen=3/seqid=21 V: v15 + row1/cf:a/10014/Put/vlen=3/seqid=22 V: v14 + row1/cf:a/10013/Put/vlen=3/seqid=23 V: v13 + row1/cf:a/10012/Put/vlen=3/seqid=24 V: v12 + row1/cf:a/10011/Put/vlen=3/seqid=25 V: v11 + row1/cf:a/10010/Put/vlen=3/seqid=26 V: v10 + row1/cf:a/10009/Put/vlen=2/seqid=27 V: v9 + */ + for (int i = 18; i > 8; i--) { + Put put = new Put(ROW); + put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i)); + table.put(put); + } + TEST_UTIL.getHBaseAdmin().flush(TABLE); + RegionLocator locator = ((HTable) table).getRegionLocator(); + String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); + Region region = TEST_UTIL.getRSForFirstRegionInTable(TABLE).getFromOnlineRegions( + regionName); + region.compact(true); + } +}