Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java (revision 1195022) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java (working copy) @@ -50,9 +50,6 @@ private HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private final String DIR = TEST_UTIL.getDataTestDir("TestIncrement").toString(); - - private final int MAX_VERSIONS = 2; - // Test names static final byte[] tableName = Bytes.toBytes("testtable");; static final byte[] qual1 = Bytes.toBytes("qual1"); Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1195022) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -3718,13 +3718,14 @@ */ public Result append(Append append, Integer lockid, boolean writeToWAL) throws IOException { - // TODO: Use RWCC to make this set of appends atomic to reads byte[] row = append.getRow(); checkRow(row, "append"); boolean flush = false; WALEdit walEdits = null; List allKVs = new ArrayList(append.size()); - List kvs = new ArrayList(append.size()); + Map> newFamilyMap = new TreeMap>(Bytes.BYTES_COMPARATOR); + long now = EnvironmentEdgeManager.currentTimeMillis(); long size = 0; long txid = 0; @@ -3740,8 +3741,6 @@ for (Map.Entry> family : append.getFamilyMap() .entrySet()) { - Store store = stores.get(family.getKey()); - // Get previous values for all columns in this family Get get = new Get(row); for (KeyValue kv : family.getValue()) { @@ -3755,6 +3754,8 @@ // Avoid as much copying as possible. Every byte is copied at most // once. // Would be nice if KeyValue had scatter/gather logic + List kvs = new ArrayList(family.getValue().size()); + newFamilyMap.put(family.getKey(), kvs); int idx = 0; for (KeyValue kv : family.getValue()) { KeyValue newKV; @@ -3766,7 +3767,7 @@ newKV = new KeyValue(row.length, kv.getFamilyLength(), kv.getQualifierLength(), now, KeyValue.Type.Put, oldKv.getValueLength() + kv.getValueLength()); - // copy in the value + // copy in the updated value System.arraycopy(oldKv.getBuffer(), oldKv.getValueOffset(), newKV.getBuffer(), newKV.getValueOffset(), oldKv.getValueLength()); @@ -3780,7 +3781,7 @@ newKV = new KeyValue(row.length, kv.getFamilyLength(), kv.getQualifierLength(), now, KeyValue.Type.Put, kv.getValueLength()); - // copy in the value + // copy in the new value System.arraycopy(kv.getBuffer(), kv.getValueOffset(), newKV.getBuffer(), newKV.getValueOffset(), kv.getValueLength()); @@ -3796,41 +3797,34 @@ kv.getQualifierLength()); kvs.add(newKV); - - // Append update to WAL - if (writeToWAL) { - if (walEdits == null) { - walEdits = new WALEdit(); - } - walEdits.add(newKV); - } } - - // Write the KVs for this family into the store - size += store.upsert(kvs); allKVs.addAll(kvs); - kvs.clear(); } // Actually write to WAL now if (writeToWAL) { - // Using default cluster id, as this can only happen in the orginating - // cluster. A slave cluster receives the final value (not the delta) - // as a Put. + walEdits = new WALEdit(); + addFamilyMapToWALEdit(newFamilyMap, walEdits); txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(), walEdits, HConstants.DEFAULT_CLUSTER_ID, now, this.htableDescriptor); } - size = this.addAndGetGlobalMemstoreSize(size); - flush = isFlushSize(size); + // atomically update memstore + size = applyFamilyMapToMemstore(newFamilyMap, null); } finally { this.updatesLock.readLock().unlock(); releaseRowLock(lid); } + if (writeToWAL) { this.log.sync(txid); // sync the transaction log outside the rowlock } + + // not done atomically + size -= removeDupsInMemstore(newFamilyMap); + size = this.addAndGetGlobalMemstoreSize(size); + flush = isFlushSize(size); } finally { closeRegionOperation(); } @@ -3858,14 +3852,15 @@ public Result increment(Increment increment, Integer lockid, boolean writeToWAL) throws IOException { - // TODO: Use RWCC to make this set of increments atomic to reads byte [] row = increment.getRow(); checkRow(row, "increment"); TimeRange tr = increment.getTimeRange(); boolean flush = false; WALEdit walEdits = null; List allKVs = new ArrayList(increment.numColumns()); - List kvs = new ArrayList(increment.numColumns()); + Map> newFamilyMap = new TreeMap>(Bytes.BYTES_COMPARATOR); + long now = EnvironmentEdgeManager.currentTimeMillis(); long size = 0; long txid = 0; @@ -3881,8 +3876,6 @@ for (Map.Entry> family : increment.getFamilyMap().entrySet()) { - Store store = stores.get(family.getKey()); - // Get previous values for all columns in this family Get get = new Get(row); for (Map.Entry column : family.getValue().entrySet()) { @@ -3893,6 +3886,8 @@ // Iterate the input columns and update existing values if they were // found, otherwise add new column initialized to the increment amount + List kvs = new ArrayList(family.getValue().size()); + newFamilyMap.put(family.getKey(), kvs); int idx = 0; for (Map.Entry column : family.getValue().entrySet()) { long amount = column.getValue(); @@ -3906,35 +3901,21 @@ // Append new incremented KeyValue to list KeyValue newKV = new KeyValue(row, family.getKey(), column.getKey(), now, Bytes.toBytes(amount)); - kvs.add(newKV); - // Append update to WAL - if (writeToWAL) { - if (walEdits == null) { - walEdits = new WALEdit(); - } - walEdits.add(newKV); - } + kvs.add(newKV); } - - // Write the KVs for this family into the store - size += store.upsert(kvs); allKVs.addAll(kvs); - kvs.clear(); } - - // Actually write to WAL now if (writeToWAL) { - // Using default cluster id, as this can only happen in the orginating - // cluster. A slave cluster receives the final value (not the delta) - // as a Put. + walEdits = new WALEdit(); + addFamilyMapToWALEdit(newFamilyMap, walEdits); txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(), walEdits, HConstants.DEFAULT_CLUSTER_ID, now, this.htableDescriptor); } - size = this.addAndGetGlobalMemstoreSize(size); - flush = isFlushSize(size); + // atomically add all values + size = applyFamilyMapToMemstore(newFamilyMap, null); } finally { this.updatesLock.readLock().unlock(); releaseRowLock(lid); @@ -3942,6 +3923,9 @@ if (writeToWAL) { this.log.sync(txid); // sync the transaction log outside the rowlock } + size -= removeDupsInMemstore(newFamilyMap); + size = this.addAndGetGlobalMemstoreSize(size); + flush = isFlushSize(size); } finally { closeRegionOperation(); } @@ -3954,6 +3938,15 @@ return new Result(allKVs); } + private long removeDupsInMemstore(Map> familyMap) { + long size = 0; + for (Map.Entry> family : familyMap.entrySet()) { + Store store = stores.get(family.getKey()); + size += store.removeDups(family.getValue()); + } + return size; + } + /** * @param row * @param family Index: src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1195022) +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -1829,24 +1829,20 @@ } /** - * Adds or replaces the specified KeyValues. + * Removes duplicate KVs. *

- * For each KeyValue specified, if a cell with the same row, family, and - * qualifier exists in MemStore, it will be replaced. Otherwise, it will just - * be inserted to MemStore. + * For each KeyValue specified, if a cell with the same row, family, and qualifier, + * but a smaller memstoreTS, it will be removed. *

- * This operation is atomic on each KeyValue (row/family/qualifier) but not - * necessarily atomic across all of them. + * This operation is not atomic accross multiple CFs, but since the KVs are shadowed + * by newer versions it should not matter. * @param kvs * @return memstore size delta - * @throws IOException */ - public long upsert(List kvs) - throws IOException { + public long removeDups(List kvs) { this.lock.readLock().lock(); try { - // TODO: Make this operation atomic w/ RWCC - return this.memstore.upsert(kvs); + return this.memstore.removeDups(kvs); } finally { this.lock.readLock().unlock(); } Index: src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (revision 1195022) +++ src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (working copy) @@ -24,6 +24,7 @@ import java.lang.management.RuntimeMXBean; import java.rmi.UnexpectedException; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -535,28 +536,42 @@ } /** - * Inserts the specified KeyValue into MemStore and deletes any existing - * versions of the same row/family/qualifier as the specified KeyValue. + * Remove duplicate KeyValues. *

- * First, the specified KeyValue is inserted into the Memstore. + * For each KeyValue if the keyValue does already exist, with a + * memstoreTS < the passed one it will then be removed. *

- * If there are any existing KeyValues in this MemStore with the same row, - * family, and qualifier, they are removed. + * This is called under row lock, so Get operations will still see updates + * atomically. Scans will only see each KeyValue update as atomic. + * + * @param kvs + * @return change in memstore size + */ + public long removeDups(List kvs) { + this.lock.readLock().lock(); + try { + long removedSize = 0; + for (KeyValue kv :kvs) { + removedSize += removeDups(kv, kv.getMemstoreTS()); + } + return removedSize; + } finally { + this.lock.readLock().unlock(); + } + } + + /** + * Remove any duplicate KVs from the memstore. + * A duplicate is a KV with the same row, family, and qualifier, + * but with a memstoreTS < the passed one *

* Callers must hold the read lock. - * * @param kv - * @return change in size of MemStore + * @param memstoreTS + * @return */ - private long upsert(KeyValue kv) { - // Add the KeyValue to the MemStore - // Use the internalAdd method here since we (a) already have a lock - // and (b) cannot safely use the MSLAB here without potentially - // hitting OOME - see TestMemStore.testUpsertMSLAB for a - // test that triggers the pathological case if we don't avoid MSLAB - // here. - long addedSize = internalAdd(kv); - + private long removeDups(KeyValue kv, long memstoreTS) { + long removedSize = 0; // Get the KeyValues for the row/family/qualifier regardless of timestamp. // For this case we want to clean up any other puts KeyValue firstKv = KeyValue.createFirstOnRow( @@ -580,11 +595,11 @@ // if the qualifier matches and it's a put, remove it if (kv.matchingQualifier(cur)) { - // to be extra safe we only remove Puts that have a memstoreTS==0 + // to be extra safe we only remove Puts and check the memstoreTS if (kv.getType() == KeyValue.Type.Put.getCode() && - kv.getMemstoreTS() == 0) { + kv.getMemstoreTS() < memstoreTS) { // false means there was a change, so give us the size. - addedSize -= heapSizeChange(kv, true); + removedSize += heapSizeChange(kv, true); it.remove(); } } else { @@ -592,9 +607,34 @@ break; } } - return addedSize; + return removedSize; } + /** + * Inserts the specified KeyValue into MemStore and deletes any existing + * versions of the same row/family/qualifier as the specified KeyValue. + *

+ * First, the specified KeyValue is inserted into the Memstore. + *

+ * If there are any existing KeyValues in this MemStore with the same row, + * family, and qualifier, they are removed. + *

+ * Callers must hold the read lock. + * + * @param kv + * @return change in size of MemStore + */ + private long upsert(KeyValue kv) { + // Add the KeyValue to the MemStore + // Use the internalAdd method here since we (a) already have a lock + // and (b) cannot safely use the MSLAB here without potentially + // hitting OOME - see TestMemStore.testUpsertMSLAB for a + // test that triggers the pathological case if we don't avoid MSLAB + // here. + // Passing 1 as memstoreTS for backward compatibility. + return internalAdd(kv) - removeDups(kv, 1); + } + /* * Immutable data structure to hold member found in set and the set it was * found in. Include set because it is carrying context.