From 2b2f30c7f53225149386a25709dca23510a8fb4f Mon Sep 17 00:00:00 2001 From: ChiaPing Tsai Date: Tue, 24 Jan 2017 12:43:40 +0800 Subject: [PATCH] HBASE-17519 Rollback the removed cells --- .../hadoop/hbase/regionserver/DefaultMemStore.java | 12 +- .../apache/hadoop/hbase/regionserver/HRegion.java | 85 ++--- .../apache/hadoop/hbase/regionserver/HStore.java | 5 +- .../apache/hadoop/hbase/regionserver/MemStore.java | 3 +- .../apache/hadoop/hbase/regionserver/Store.java | 3 +- .../hbase/client/TestRollbackFromClient.java | 352 +++++++++++++++++++++ .../hbase/regionserver/TestDefaultMemStore.java | 6 +- 7 files changed, 418 insertions(+), 48 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRollbackFromClient.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index a47cafd..7b7446a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -552,7 +552,7 @@ public class DefaultMemStore implements MemStore { // 'now' and a 0 memstoreTS == immediately visible List cells = new ArrayList(1); cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue))); - return upsert(cells, 1L); + return upsert(cells, 1L, null); } /** @@ -571,13 +571,14 @@ public class DefaultMemStore implements MemStore { * * @param cells * @param readpoint readpoint below which we can safely remove duplicate KVs + * @param removedCells collect the removed cells. It can be null. * @return change in memstore size */ @Override - public long upsert(Iterable cells, long readpoint) { + public long upsert(Iterable cells, long readpoint, List removedCells) { long size = 0; for (Cell cell : cells) { - size += upsert(cell, readpoint); + size += upsert(cell, readpoint, removedCells); } return size; } @@ -596,7 +597,7 @@ public class DefaultMemStore implements MemStore { * @param cell * @return change in size of MemStore */ - private long upsert(Cell cell, long readpoint) { + private long upsert(Cell cell, long readpoint, List removedCells) { // Add the Cell to the MemStore // Use the internalAdd method here since we (a) already have a lock // and (b) cannot safely use the MSLAB here without potentially @@ -635,6 +636,9 @@ public class DefaultMemStore implements MemStore { long delta = heapSizeChange(cur, true); addedSize -= delta; this.size.addAndGet(-delta); + if (removedCells != null) { + removedCells.add(cur); + } it.remove(); setOldestEditTimeToNow(); } else { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 209f6ad..9ed10d8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -4025,11 +4025,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * the wal. This method is then invoked to rollback the memstore. */ private void rollbackMemstore(List memstoreCells) { - int kvsRolledback = 0; + rollbackMemstore(null, memstoreCells); + } + private void rollbackMemstore(final Store defaultStore, List memstoreCells) { + int kvsRolledback = 0; for (Cell cell : memstoreCells) { - byte[] family = CellUtil.cloneFamily(cell); - Store store = getStore(family); + Store store = defaultStore; + if (store == null) { + byte[] family = CellUtil.cloneFamily(cell); + store = getStore(family); + } store.rollback(cell); kvsRolledback++; } @@ -7586,12 +7592,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi byte[] row = mutate.getRow(); checkRow(row, op.toString()); checkFamilies(mutate.getFamilyCellMap().keySet()); - boolean flush = false; Durability durability = getEffectiveDurability(mutate.getDurability()); boolean writeToWAL = durability != Durability.SKIP_WAL; WALEdit walEdits = null; List allKVs = new ArrayList(mutate.size()); Map> tempMemstore = new HashMap>(); + Map> removedCellsForMemStore = new HashMap<>(); long size = 0; long txid = 0; checkReadOnly(); @@ -7718,26 +7724,24 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // Actually write to Memstore now - if (!tempMemstore.isEmpty()) { - for (Map.Entry> entry : tempMemstore.entrySet()) { - Store store = entry.getKey(); - if (store.getFamily().getMaxVersions() == 1) { - // upsert if VERSIONS for this CF == 1 - size += store.upsert(entry.getValue(), getSmallestReadPoint()); - } else { - // otherwise keep older versions around - size += store.add(entry.getValue()); - if (!entry.getValue().isEmpty()) { - doRollBackMemstore = true; - } + doRollBackMemstore = !tempMemstore.isEmpty(); + for (Map.Entry> entry : tempMemstore.entrySet()) { + Store store = entry.getKey(); + if (store.getFamily().getMaxVersions() == 1) { + List removedCells = removedCellsForMemStore.get(store); + if (removedCells == null) { + removedCells = new ArrayList<>(); + removedCellsForMemStore.put(store, removedCells); } - // We add to all KVs here whereas when doing increment, we do it - // earlier... why? - allKVs.addAll(entry.getValue()); + // upsert if VERSIONS for this CF == 1 + size += store.upsert(entry.getValue(), getSmallestReadPoint(), removedCells); + } else { + // otherwise keep older versions around + size += store.add(entry.getValue()); } - - size = this.addAndGetGlobalMemstoreSize(size); - flush = isFlushSize(size); + // We add to all KVs here whereas when doing increment, we do it + // earlier... why? + allKVs.addAll(entry.getValue()); } } finally { this.updatesLock.readLock().unlock(); @@ -7763,7 +7767,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // if the wal sync was unsuccessful, remove keys from memstore WriteEntry we = walKey != null? walKey.getWriteEntry(): null; if (doRollBackMemstore) { - rollbackMemstore(allKVs); + for (Map.Entry> entry: tempMemstore.entrySet()) { + rollbackMemstore(entry.getKey(), entry.getValue()); + } + for (Map.Entry> entry: removedCellsForMemStore.entrySet()) { + entry.getKey().add(entry.getValue()); + } if (we != null) mvcc.complete(we); } else if (we != null) { mvcc.completeAndWait(we); @@ -7775,12 +7784,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (this.metricsRegion != null) { this.metricsRegion.updateAppend(); } - - if (flush) { - // Request a cache flush. Do it outside update lock. - requestFlush(); - } - + if (isFlushSize(this.addAndGetGlobalMemstoreSize(size))) requestFlush(); return mutate.isReturnResults() ? Result.create(allKVs) : null; } @@ -7887,7 +7891,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi boolean doRollBackMemstore = false; long accumulatedResultSize = 0; List allKVs = new ArrayList(increment.size()); - List memstoreCells = new ArrayList(); + Map> removedCellsForMemStore = new HashMap<>(); + Map> forMemStore = new HashMap<>(); Durability effectiveDurability = getEffectiveDurability(increment.getDurability()); try { rowLock = getRowLockInternal(increment.getRow(), false); @@ -7907,7 +7912,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi WALEdit walEdits = null; // Process increments a Store/family at a time. // Accumulate edits for memstore to add later after we've added to WAL. - Map> forMemStore = new HashMap>(); for (Map.Entry> entry: increment.getFamilyCellMap().entrySet()) { byte [] columnFamilyName = entry.getKey(); List increments = entry.getValue(); @@ -7956,19 +7960,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // Now write to memstore, a family at a time. + doRollBackMemstore = !forMemStore.isEmpty(); for (Map.Entry> entry: forMemStore.entrySet()) { Store store = entry.getKey(); List results = entry.getValue(); if (store.getFamily().getMaxVersions() == 1) { + List removedCells = removedCellsForMemStore.get(store); + if (removedCells == null) { + removedCells = new ArrayList<>(); + removedCellsForMemStore.put(store, removedCells); + } // Upsert if VERSIONS for this CF == 1 - accumulatedResultSize += store.upsert(results, getSmallestReadPoint()); - // TODO: St.Ack 20151222 Why no rollback in this case? + accumulatedResultSize += store.upsert(results, getSmallestReadPoint(), removedCells); } else { // Otherwise keep older versions around accumulatedResultSize += store.add(entry.getValue()); - if (!entry.getValue().isEmpty()) { - doRollBackMemstore = true; - } } } } finally { @@ -7993,7 +7999,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // if the wal sync was unsuccessful, remove keys from memstore if (doRollBackMemstore) { - rollbackMemstore(memstoreCells); + for (Map.Entry> entry: forMemStore.entrySet()) { + rollbackMemstore(entry.getKey(), entry.getValue()); + } + for (Map.Entry> entry: removedCellsForMemStore.entrySet()) { + entry.getKey().add(entry.getValue()); + } if (walKey != null) mvcc.complete(walKey.getWriteEntry()); } else { if (walKey != null) mvcc.completeAndWait(walKey.getWriteEntry()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index a15bf13..b2cc3a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -2388,10 +2388,11 @@ public class HStore implements Store { } @Override - public long upsert(Iterable cells, long readpoint) throws IOException { + public long upsert(Iterable cells, long readpoint, + List removedCells) throws IOException { this.lock.readLock().lock(); try { - return this.memstore.upsert(cells, readpoint); + return this.memstore.upsert(cells, readpoint, removedCells); } finally { this.lock.readLock().unlock(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index a885d79..5e5a1ce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -135,9 +135,10 @@ public interface MemStore extends HeapSize { * only see each KeyValue update as atomic. * @param cells * @param readpoint readpoint below which we can safely remove duplicate Cells. + * @param removedCells collect the removed cells. It can be null. * @return change in memstore size */ - long upsert(Iterable cells, long readpoint); + long upsert(Iterable cells, long readpoint, List removedCells); /** * @return scanner over the memstore. This might include scanner over the snapshot when one is diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index ccdc523..9d5d3b6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -138,10 +138,11 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf * across all of them. * @param cells * @param readpoint readpoint below which we can safely remove duplicate KVs + * @param removedCells collect the removed cells. It can be null. * @return memstore size delta * @throws IOException */ - long upsert(Iterable cells, long readpoint) throws IOException; + long upsert(Iterable cells, long readpoint, List removedCells) throws IOException; /** * Adds a value to the memstore diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRollbackFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRollbackFromClient.java new file mode 100644 index 0000000..9230f31 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRollbackFromClient.java @@ -0,0 +1,352 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALKey; +import org.junit.AfterClass; +import org.junit.Assert; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category(SmallTests.class) +public class TestRollbackFromClient { + @Rule + public TestName name = new TestName(); + private final static HBaseTestingUtility TEST_UTIL + = new HBaseTestingUtility(); + private static final byte[] FAMILY = Bytes.toBytes("testFamily"); + private static final int SLAVES = 3; + private static final byte[] ROW = Bytes.toBytes("testRow"); + private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier"); + private static final byte[] QUALIFIER_V2 = Bytes.toBytes("testQualifierV2"); + private static final byte[] VALUE = Bytes.toBytes("testValue"); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); + TEST_UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, FailedDefaultWALProvider.class.getName()); + TEST_UTIL.startMiniCluster(SLAVES); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testAppendRollback() throws IOException { + Updater updateForEmptyTable = new Updater() { + @Override + public int updateData(Table table, byte[] family) { + try { + Append append = new Append(ROW); + append.add(FAMILY, QUALIFIER, VALUE); + append.add(FAMILY, QUALIFIER_V2, VALUE); + FailedHLog.SHOULD_FAIL.set(true); + table.append(append); + } catch (IOException e) { + // It should fail because the WAL fail also + } finally { + FailedHLog.SHOULD_FAIL.set(false); + } + return 0; + } + }; + testRollback(updateForEmptyTable, 1, null); + testRollback(updateForEmptyTable, 2, null); + + final Append preAppend = new Append(ROW); + preAppend.add(FAMILY, QUALIFIER, VALUE); + Cell initCell = preAppend.getCellList(FAMILY).get(0); + Updater updateForNonEmptyTable = new Updater() { + @Override + public int updateData(Table table, byte[] family) throws IOException { + table.append(preAppend); + try { + Append append = new Append(ROW); + append.add(FAMILY, QUALIFIER, VALUE); + append.add(FAMILY, QUALIFIER_V2, VALUE); + FailedHLog.SHOULD_FAIL.set(true); + table.append(append); + Assert.fail("It should fail because the WAL sync is failed"); + } catch (IOException e) { + } finally { + FailedHLog.SHOULD_FAIL.set(false); + } + return 1; + } + }; + testRollback(updateForNonEmptyTable, 1, initCell); + testRollback(updateForNonEmptyTable, 2, initCell); + } + + @Test + public void testIncrementRollback() throws IOException { + Updater updateForEmptyTable = new Updater() { + @Override + public int updateData(Table table, byte[] family) { + try { + Increment inc = new Increment(ROW); + inc.addColumn(FAMILY, QUALIFIER, 1); + inc.addColumn(FAMILY, QUALIFIER_V2, 2); + FailedHLog.SHOULD_FAIL.set(true); + table.increment(inc); + } catch (IOException e) { + // It should fail because the WAL fail also + } finally { + FailedHLog.SHOULD_FAIL.set(false); + } + return 0; + } + }; + testRollback(updateForEmptyTable, 1, null); + testRollback(updateForEmptyTable, 2, null); + + final Increment preIncrement = new Increment(ROW); + preIncrement.addColumn(FAMILY, QUALIFIER, 1); + Cell initCell = preIncrement.getCellList(FAMILY).get(0); + Updater updateForNonEmptyTable = new Updater() { + @Override + public int updateData(Table table, byte[] family) throws IOException { + table.increment(preIncrement); + try { + Increment inc = new Increment(ROW); + inc.addColumn(FAMILY, QUALIFIER, 1); + inc.addColumn(FAMILY, QUALIFIER_V2, 2); + FailedHLog.SHOULD_FAIL.set(true); + table.increment(inc); + Assert.fail("It should fail because the WAL sync is failed"); + } catch (IOException e) { + } finally { + FailedHLog.SHOULD_FAIL.set(false); + } + return 1; + } + }; + testRollback(updateForNonEmptyTable, 1, initCell); + testRollback(updateForNonEmptyTable, 2, initCell); + } + + @Test + public void testPutRollback() throws IOException { + Updater updateForEmptyTable = new Updater() { + @Override + public int updateData(Table table, byte[] family) { + try { + Put put = new Put(ROW); + put.addColumn(FAMILY, QUALIFIER, VALUE); + FailedHLog.SHOULD_FAIL.set(true); + table.put(put); + Assert.fail("It should fail because the WAL sync is failed"); + } catch (IOException e) { + } finally { + FailedHLog.SHOULD_FAIL.set(false); + } + return 0; + } + }; + testRollback(updateForEmptyTable, 1, null); + testRollback(updateForEmptyTable, 2, null); + + final Put prePut = new Put(ROW); + prePut.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("aaaaaaaaaaaaaaaaaaaaaa")); + Cell preCell = prePut.getCellList(FAMILY).get(0); + Updater updateForNonEmptyTable = new Updater() { + @Override + public int updateData(Table table, byte[] family) throws IOException { + table.put(prePut); + try { + Put put = new Put(ROW); + put.addColumn(FAMILY, QUALIFIER, VALUE); + FailedHLog.SHOULD_FAIL.set(true); + table.put(put); + Assert.fail("It should fail because the WAL sync is failed"); + } catch (IOException e) { + } finally { + FailedHLog.SHOULD_FAIL.set(false); + } + return 1; + } + }; + testRollback(updateForNonEmptyTable, 1, preCell); + testRollback(updateForNonEmptyTable, 2, preCell); + } + + private void testRollback(Updater updater, int versions, Cell initCell) throws IOException { + TableName tableName = TableName.valueOf(this.name.getMethodName()); + HTableDescriptor desc = new HTableDescriptor(tableName); + HColumnDescriptor col = new HColumnDescriptor(FAMILY); + col.setMaxVersions(versions); + desc.addFamily(col); + TEST_UTIL.getHBaseAdmin().createTable(desc); + int expected; + List cells; + try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); + Table table = conn.getTable(tableName)) { + expected = updater.updateData(table, FAMILY); + cells = getAllCells(table); + } + TEST_UTIL.getHBaseAdmin().disableTable(tableName); + TEST_UTIL.getHBaseAdmin().deleteTable(tableName); + assertEquals(expected, cells.size()); + if (initCell != null && cells.isEmpty()) { + Cell cell = cells.get(0); + assertTrue("row isn't matched", CellUtil.matchingRow(initCell, cell)); + assertTrue("column isn't matched", CellUtil.matchingColumn(initCell, cell)); + assertTrue("qualifier isn't matched", CellUtil.matchingQualifier(initCell, cell)); + assertTrue("value isn't matched", CellUtil.matchingValue(initCell, cell)); + } + } + + interface Updater { + int updateData(Table table, byte[] family) throws IOException; + } + + private static List getAllCells(Table table) throws IOException { + List cells = new ArrayList<>(); + try (ResultScanner scanner = table.getScanner(new Scan())) { + for (Result r : scanner) { + cells.addAll(r.listCells()); + } + return cells; + } + } + + public static class FailedDefaultWALProvider extends DefaultWALProvider { + @Override + public WAL getWAL(final byte[] identifier, byte[] namespace) throws IOException { + WAL wal = super.getWAL(identifier, namespace); + return new FailedHLog(wal); + } + } + + public static class FailedHLog implements WAL { + private static final AtomicBoolean SHOULD_FAIL = new AtomicBoolean(false); + private final WAL delegation; + FailedHLog(final WAL delegation) { + this.delegation = delegation; + } + @Override + public void registerWALActionsListener(WALActionsListener listener) { + delegation.registerWALActionsListener(listener); + } + + @Override + public boolean unregisterWALActionsListener(WALActionsListener listener) { + return delegation.unregisterWALActionsListener(listener); + } + + @Override + public byte[][] rollWriter() throws FailedLogCloseException, IOException { + return delegation.rollWriter(); + } + + @Override + public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException { + return delegation.rollWriter(force); + } + + @Override + public void shutdown() throws IOException { + delegation.shutdown(); + } + + @Override + public void close() throws IOException { + delegation.close(); + } + + @Override + public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits, boolean inMemstore) throws IOException { + return delegation.append(htd, info, key, edits, inMemstore); + } + + @Override + public void sync() throws IOException { + delegation.sync(); + } + + @Override + public void sync(long txid) throws IOException { + if (SHOULD_FAIL.get()) { + throw new IOException("[TESTING] we need the failure!!!"); + } + delegation.sync(txid); + } + + @Override + public Long startCacheFlush(byte[] encodedRegionName, Set families) { + return delegation.startCacheFlush(encodedRegionName, families); + } + + @Override + public void completeCacheFlush(byte[] encodedRegionName) { + delegation.completeCacheFlush(encodedRegionName); + } + + @Override + public void abortCacheFlush(byte[] encodedRegionName) { + delegation.abortCacheFlush(encodedRegionName); + } + + @Override + public WALCoprocessorHost getCoprocessorHost() { + return delegation.getCoprocessorHost(); + } + + @Override + public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) { + return delegation.getEarliestMemstoreSeqNum(encodedRegionName); + } + + @Override + public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName) { + return delegation.getEarliestMemstoreSeqNum(encodedRegionName, familyName); + } + + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index 32adf5b..7cb74b6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -866,7 +866,7 @@ public class TestDefaultMemStore extends TestCase { kv1.setSequenceId(1); kv2.setSequenceId(1);kv3.setSequenceId(1); l.add(kv1); l.add(kv2); l.add(kv3); - this.memstore.upsert(l, 2);// readpoint is 2 + this.memstore.upsert(l, 2, null);// readpoint is 2 long newSize = this.memstore.size.get(); assert(newSize > oldSize); //The kv1 should be removed. @@ -875,7 +875,7 @@ public class TestDefaultMemStore extends TestCase { KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v"); kv4.setSequenceId(1); l.clear(); l.add(kv4); - this.memstore.upsert(l, 3); + this.memstore.upsert(l, 3, null); assertEquals(newSize, this.memstore.size.get()); //The kv2 should be removed. assert(memstore.cellSet.size() == 2); @@ -919,7 +919,7 @@ public class TestDefaultMemStore extends TestCase { KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v"); kv1.setSequenceId(100); l.add(kv1); - memstore.upsert(l, 1000); + memstore.upsert(l, 1000, null); t = memstore.timeOfOldestEdit(); assertTrue(t == 1234); } finally { -- 2.9.3