Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java (revision 1195022) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java (working copy) @@ -50,9 +50,6 @@ private HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private final String DIR = TEST_UTIL.getDataTestDir("TestIncrement").toString(); - - private final int MAX_VERSIONS = 2; - // Test names static final byte[] tableName = Bytes.toBytes("testtable");; static final byte[] qual1 = Bytes.toBytes("qual1"); Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1195022) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -110,8 +110,10 @@ import org.cliffc.high_scale_lib.Counter; import com.google.common.base.Preconditions; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ClassToInstanceMap; import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; import com.google.common.collect.MutableClassToInstanceMap; /** @@ -2228,7 +2230,7 @@ * @return the additional memory usage of the memstore caused by the * new entries. */ - private long applyFamilyMapToMemstore(Map> familyMap, + private long applyFamilyMapToMemstore(Map> familyMap, ReadWriteConsistencyControl.WriteEntry localizedWriteEntry) { long size = 0; boolean freerwcc = false; @@ -2239,9 +2241,9 @@ freerwcc = true; } - for (Map.Entry> e : familyMap.entrySet()) { + for (Map.Entry> e : familyMap.entrySet()) { byte[] family = e.getKey(); - List edits = e.getValue(); + Collection edits = e.getValue(); Store store = getStore(family); for (KeyValue kv: edits) { @@ -3718,13 +3720,13 @@ */ public Result append(Append append, Integer lockid, boolean writeToWAL) throws IOException { - // TODO: Use RWCC to make this set of appends atomic to reads byte[] row = append.getRow(); checkRow(row, "append"); boolean flush = false; WALEdit walEdits = null; List allKVs = new ArrayList(append.size()); - List kvs = new ArrayList(append.size()); + Multimap newFamilyMap = ArrayListMultimap.create(); + long now = EnvironmentEdgeManager.currentTimeMillis(); long size = 0; long txid = 0; @@ -3740,8 +3742,6 @@ for (Map.Entry> family : append.getFamilyMap() .entrySet()) { - Store store = stores.get(family.getKey()); - // Get previous values for all columns in this family Get get = new Get(row); for (KeyValue kv : family.getValue()) { @@ -3766,7 +3766,7 @@ newKV = new KeyValue(row.length, kv.getFamilyLength(), kv.getQualifierLength(), now, KeyValue.Type.Put, oldKv.getValueLength() + kv.getValueLength()); - // copy in the value + // copy in the updated value System.arraycopy(oldKv.getBuffer(), oldKv.getValueOffset(), newKV.getBuffer(), newKV.getValueOffset(), oldKv.getValueLength()); @@ -3780,7 +3780,7 @@ newKV = new KeyValue(row.length, kv.getFamilyLength(), kv.getQualifierLength(), now, KeyValue.Type.Put, kv.getValueLength()); - // copy in the value + // copy in the new value System.arraycopy(kv.getBuffer(), kv.getValueOffset(), newKV.getBuffer(), newKV.getValueOffset(), kv.getValueLength()); @@ -3795,7 +3795,8 @@ newKV.getBuffer(), newKV.getQualifierOffset(), kv.getQualifierLength()); - kvs.add(newKV); + allKVs.add(newKV); + newFamilyMap.put(family.getKey(), newKV); // Append update to WAL if (writeToWAL) { @@ -3805,11 +3806,6 @@ walEdits.add(newKV); } } - - // Write the KVs for this family into the store - size += store.upsert(kvs); - allKVs.addAll(kvs); - kvs.clear(); } // Actually write to WAL now @@ -3822,15 +3818,35 @@ HConstants.DEFAULT_CLUSTER_ID, now, this.htableDescriptor); } + // atomically update memstore + size = applyFamilyMapToMemstore(newFamilyMap.asMap(), null); size = this.addAndGetGlobalMemstoreSize(size); flush = isFlushSize(size); } finally { this.updatesLock.readLock().unlock(); releaseRowLock(lid); } + // Note that this is not correct + // The WAL should be synched first, before the changes are applied + // to the Memstore, or changes should be rolled back if this fails if (writeToWAL) { this.log.sync(txid); // sync the transaction log outside the rowlock } + + // since these are guaranteed to be shadowed by newer versions + // (we advanced rwcc), it should not matter whether we have + // updateLock.readLock... + try { + // not done atomically + size = removeDupsInMemstore(newFamilyMap.asMap()); + size = this.addAndGetGlobalMemstoreSize(-size); + // still need to flush? + flush = isFlushSize(size); + } catch (Throwable t) { + // these KVs are shadowed, so a failure here is no problem. + // this is just an optimization... warn but continue + LOG.warn("Problem removing duplicates", t); + } } finally { closeRegionOperation(); } @@ -3858,14 +3874,13 @@ public Result increment(Increment increment, Integer lockid, boolean writeToWAL) throws IOException { - // TODO: Use RWCC to make this set of increments atomic to reads byte [] row = increment.getRow(); checkRow(row, "increment"); TimeRange tr = increment.getTimeRange(); boolean flush = false; WALEdit walEdits = null; List allKVs = new ArrayList(increment.numColumns()); - List kvs = new ArrayList(increment.numColumns()); + Multimap newFamilyMap = ArrayListMultimap.create(); long now = EnvironmentEdgeManager.currentTimeMillis(); long size = 0; long txid = 0; @@ -3881,8 +3896,6 @@ for (Map.Entry> family : increment.getFamilyMap().entrySet()) { - Store store = stores.get(family.getKey()); - // Get previous values for all columns in this family Get get = new Get(row); for (Map.Entry column : family.getValue().entrySet()) { @@ -3906,7 +3919,8 @@ // Append new incremented KeyValue to list KeyValue newKV = new KeyValue(row, family.getKey(), column.getKey(), now, Bytes.toBytes(amount)); - kvs.add(newKV); + newFamilyMap.put(family.getKey(), newKV); + allKVs.add(newKV); // Append update to WAL if (writeToWAL) { @@ -3916,11 +3930,6 @@ walEdits.add(newKV); } } - - // Write the KVs for this family into the store - size += store.upsert(kvs); - allKVs.addAll(kvs); - kvs.clear(); } // Actually write to WAL now @@ -3933,6 +3942,8 @@ this.htableDescriptor); } + // atomically add all values + size = applyFamilyMapToMemstore(newFamilyMap.asMap(), null); size = this.addAndGetGlobalMemstoreSize(size); flush = isFlushSize(size); } finally { @@ -3942,6 +3953,20 @@ if (writeToWAL) { this.log.sync(txid); // sync the transaction log outside the rowlock } + // since these are guaranteed to be shadowed by newer versions + // (we advanced the readpoint), it should not matter whether we have + // updateLock.readLock... + try { + // this is not done atomically + size = removeDupsInMemstore(newFamilyMap.asMap()); + size = this.addAndGetGlobalMemstoreSize(-size); + // still need to flush? + flush = isFlushSize(size); + } catch (Throwable t) { + // these KVs are shadowed, so a failure here is no problem. + // warn but continue + LOG.warn("Problem removing duplicates", t); + } } finally { closeRegionOperation(); } @@ -3954,6 +3979,15 @@ return new Result(allKVs); } + private long removeDupsInMemstore(Map> familyMap) { + long size = 0; + for (Map.Entry> family : familyMap.entrySet()) { + Store store = stores.get(family.getKey()); + size += store.removeDups(family.getValue()); + } + return size; + } + /** * @param row * @param family Index: src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1195022) +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -1829,24 +1829,20 @@ } /** - * Adds or replaces the specified KeyValues. + * Removes duplicate KVs. *

* For each KeyValue specified, if a cell with the same row, family, and - * qualifier exists in MemStore, it will be replaced. Otherwise, it will just - * be inserted to MemStore. + * qualifier exists in MemStore with a memstoreTS < the passed KV, it will be removed. *

- * This operation is atomic on each KeyValue (row/family/qualifier) but not - * necessarily atomic across all of them. + * This operation is not atomic accross multiple CFs, but since the KVs are shadowed + * by newer versions it should not matter. * @param kvs * @return memstore size delta - * @throws IOException */ - public long upsert(List kvs) - throws IOException { + public long removeDups(Collection kvs) { this.lock.readLock().lock(); try { - // TODO: Make this operation atomic w/ RWCC - return this.memstore.upsert(kvs); + return this.memstore.removeDups(kvs); } finally { this.lock.readLock().unlock(); } Index: src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (revision 1195022) +++ src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (working copy) @@ -24,6 +24,7 @@ import java.lang.management.RuntimeMXBean; import java.rmi.UnexpectedException; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -535,28 +536,42 @@ } /** - * Inserts the specified KeyValue into MemStore and deletes any existing - * versions of the same row/family/qualifier as the specified KeyValue. + * Remove duplicate KeyValues. *

- * First, the specified KeyValue is inserted into the Memstore. + * For each KeyValue if the keyValue did already exist, with a + * memstoreTS < the passed one it will then be removed. *

- * If there are any existing KeyValues in this MemStore with the same row, - * family, and qualifier, they are removed. + * This is called under row lock, so Get operations will still see updates + * atomically. Scans will only see each KeyValue update as atomic. + * + * @param kvs + * @return change in memstore size + */ + public long removeDups(Collection kvs) { + this.lock.readLock().lock(); + try { + long removedSize = 0; + for (KeyValue kv :kvs) { + removedSize += removeDups(kv, kv.getMemstoreTS()); + } + return removedSize; + } finally { + this.lock.readLock().unlock(); + } + } + + /** + * Remove any duplicate KVs from the memstore. + * A duplicate is a KV with the same row, family, and qualifier, + * but with a memstoreTS < the passed one *

* Callers must hold the read lock. - * * @param kv - * @return change in size of MemStore + * @param memstoreTS + * @return */ - private long upsert(KeyValue kv) { - // Add the KeyValue to the MemStore - // Use the internalAdd method here since we (a) already have a lock - // and (b) cannot safely use the MSLAB here without potentially - // hitting OOME - see TestMemStore.testUpsertMSLAB for a - // test that triggers the pathological case if we don't avoid MSLAB - // here. - long addedSize = internalAdd(kv); - + private long removeDups(KeyValue kv, long memstoreTS) { + long removedSize = 0; // Get the KeyValues for the row/family/qualifier regardless of timestamp. // For this case we want to clean up any other puts KeyValue firstKv = KeyValue.createFirstOnRow( @@ -580,11 +595,11 @@ // if the qualifier matches and it's a put, remove it if (kv.matchingQualifier(cur)) { - // to be extra safe we only remove Puts that have a memstoreTS==0 + // to be extra safe we only remove Puts and check the memstoreTS if (kv.getType() == KeyValue.Type.Put.getCode() && - kv.getMemstoreTS() == 0) { + kv.getMemstoreTS() < memstoreTS) { // false means there was a change, so give us the size. - addedSize -= heapSizeChange(kv, true); + removedSize += heapSizeChange(kv, true); it.remove(); } } else { @@ -592,9 +607,34 @@ break; } } - return addedSize; + return removedSize; } + /** + * Inserts the specified KeyValue into MemStore and deletes any existing + * versions of the same row/family/qualifier as the specified KeyValue. + *

+ * First, the specified KeyValue is inserted into the Memstore. + *

+ * If there are any existing KeyValues in this MemStore with the same row, + * family, and qualifier, they are removed. + *

+ * Callers must hold the read lock. + * + * @param kv + * @return change in size of MemStore + */ + private long upsert(KeyValue kv) { + // Add the KeyValue to the MemStore + // Use the internalAdd method here since we (a) already have a lock + // and (b) cannot safely use the MSLAB here without potentially + // hitting OOME - see TestMemStore.testUpsertMSLAB for a + // test that triggers the pathological case if we don't avoid MSLAB + // here. + // Passing 1 as memstoreTS for backward compatibility. + return internalAdd(kv) - removeDups(kv, 1); + } + /* * Immutable data structure to hold member found in set and the set it was * found in. Include set because it is carrying context.