Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java (revision 1195022) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java (working copy) @@ -50,9 +50,6 @@ private HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private final String DIR = TEST_UTIL.getDataTestDir("TestIncrement").toString(); - - private final int MAX_VERSIONS = 2; - // Test names static final byte[] tableName = Bytes.toBytes("testtable");; static final byte[] qual1 = Bytes.toBytes("qual1"); Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1195022) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -2272,28 +2272,35 @@ != OperationStatusCode.SUCCESS) { continue; } - // Rollback all the kvs for this row. - Map> familyMap = familyMaps[i]; - for (Map.Entry> e : familyMap.entrySet()) { - byte[] family = e.getKey(); - List edits = e.getValue(); - - // Remove those keys from the memstore that matches our - // key's (row, cf, cq, timestamp, memstoreTS). The interesting part is - // that even the memstoreTS has to match for keys that will be rolleded-back. - Store store = getStore(family); - for (KeyValue kv: edits) { - store.rollback(kv); - kvsRolledback++; - } - } + kvsRolledback += rollbackMemstore(familyMaps[i]); } LOG.debug("rollbackMemstore rolled back " + kvsRolledback + " keyvalues from start:" + start + " to end:" + end); } /** + * Remove all the keys for a single family from the memstore. + */ + private int rollbackMemstore(Map> familyMap) { + int kvsRolledback = 0; + for (Map.Entry> e : familyMap.entrySet()) { + byte[] family = e.getKey(); + List edits = e.getValue(); + + // Remove those keys from the memstore that matches our + // key's (row, cf, cq, timestamp, memstoreTS). The interesting part is + // that even the memstoreTS has to match for keys that will be rolleded-back. + Store store = getStore(family); + for (KeyValue kv: edits) { + store.rollback(kv); + } + kvsRolledback += edits.size(); + } + return kvsRolledback; + } + + /** * Check the collection of families for validity. * @throws NoSuchColumnFamilyException if a family does not exist. */ @@ -3701,10 +3708,71 @@ return results; } - // TODO: There's a lot of boiler plate code identical - // to increment... See how to better unify that. /** - * + * Caller *must* hold updatesLock.readLock and the row lock. + * + * Both locks will be released when this methods returns + * + * @param newFamilyMap Map of KVs to upsert + * @param lid Lockid for the row + * @param now + * @param writeToWAL + * @return true if the memstore needs to be flushed + * @throws IOException + */ + private boolean atomicUpsert(Map> newFamilyMap, + Integer lid, long now, boolean writeToWAL) throws IOException { + assert isRowLocked(lid); + WALEdit walEdits; + boolean syncedWAL = !writeToWAL; + boolean unlocked = false; + try { + // 1. apply changes to MemStore + long size = applyFamilyMapToMemstore(newFamilyMap, null); + + // 2. release locks + this.updatesLock.readLock().unlock(); + releaseRowLock(lid); + unlocked = true; + + // 3. sync the WAL + if (writeToWAL) { + walEdits = new WALEdit(); + addFamilyMapToWALEdit(newFamilyMap, walEdits); + this.log.append(regionInfo, this.htableDescriptor.getName(), walEdits, + HConstants.DEFAULT_CLUSTER_ID, now, this.htableDescriptor); + syncedWAL = true; + } + + // 4. remove dups (this is the "upsert" part) + size -= removeDupsInMemstore(newFamilyMap); + size = this.addAndGetGlobalMemstoreSize(size); + + return isFlushSize(size); + } finally { + // 5. cleanup + if (!unlocked) { + // be extra safe to release the locks + this.updatesLock.readLock().unlock(); + releaseRowLock(lid); + } + if (!syncedWAL) { + // rollbacl the memstore changes if sync'ing the WAL failed + rollbackMemstore(newFamilyMap); + } + } + } + + private long removeDupsInMemstore(Map> familyMap) { + long size = 0; + for (Map.Entry> family : familyMap.entrySet()) { + Store store = stores.get(family.getKey()); + size += store.removeDups(family.getValue()); + } + return size; + } + + /** * Perform one or more append operations on a row. *

* Appends performed are done under row lock but reads do not take locks out @@ -3718,119 +3786,85 @@ */ public Result append(Append append, Integer lockid, boolean writeToWAL) throws IOException { - // TODO: Use RWCC to make this set of appends atomic to reads byte[] row = append.getRow(); checkRow(row, "append"); boolean flush = false; - WALEdit walEdits = null; + List allKVs = new ArrayList(append.size()); - List kvs = new ArrayList(append.size()); + Map> newFamilyMap = new TreeMap>(Bytes.BYTES_COMPARATOR); + long now = EnvironmentEdgeManager.currentTimeMillis(); - long size = 0; - long txid = 0; - // Lock row + // Lock row/region startRegionOperation(); this.writeRequestsCount.increment(); try { Integer lid = getLock(lockid, row, true); this.updatesLock.readLock().lock(); - try { - // Process each family - for (Map.Entry> family : append.getFamilyMap() - .entrySet()) { - Store store = stores.get(family.getKey()); + for (Map.Entry> family : append.getFamilyMap() + .entrySet()) { - // Get previous values for all columns in this family - Get get = new Get(row); - for (KeyValue kv : family.getValue()) { - get.addColumn(family.getKey(), kv.getQualifier()); - } - List results = get(get, false); + // Get previous values for all columns in this family + Get get = new Get(row); + for (KeyValue kv : family.getValue()) { + get.addColumn(family.getKey(), kv.getQualifier()); + } + List results = get(get, false); - // Iterate the input columns and update existing values if they were - // found, otherwise add new column initialized to the append value - - // Avoid as much copying as possible. Every byte is copied at most - // once. - // Would be nice if KeyValue had scatter/gather logic - int idx = 0; - for (KeyValue kv : family.getValue()) { - KeyValue newKV; - if (idx < results.size() - && results.get(idx).matchingQualifier(kv.getBuffer(), - kv.getQualifierOffset(), kv.getQualifierLength())) { - KeyValue oldKv = results.get(idx); - // allocate an empty kv once - newKV = new KeyValue(row.length, kv.getFamilyLength(), - kv.getQualifierLength(), now, KeyValue.Type.Put, - oldKv.getValueLength() + kv.getValueLength()); - // copy in the value - System.arraycopy(oldKv.getBuffer(), oldKv.getValueOffset(), - newKV.getBuffer(), newKV.getValueOffset(), - oldKv.getValueLength()); - System.arraycopy(kv.getBuffer(), kv.getValueOffset(), - newKV.getBuffer(), - newKV.getValueOffset() + oldKv.getValueLength(), - kv.getValueLength()); - idx++; - } else { - // allocate an empty kv once - newKV = new KeyValue(row.length, kv.getFamilyLength(), - kv.getQualifierLength(), now, KeyValue.Type.Put, - kv.getValueLength()); - // copy in the value - System.arraycopy(kv.getBuffer(), kv.getValueOffset(), - newKV.getBuffer(), newKV.getValueOffset(), - kv.getValueLength()); - } - // copy in row, family, and qualifier - System.arraycopy(kv.getBuffer(), kv.getRowOffset(), - newKV.getBuffer(), newKV.getRowOffset(), kv.getRowLength()); - System.arraycopy(kv.getBuffer(), kv.getFamilyOffset(), - newKV.getBuffer(), newKV.getFamilyOffset(), - kv.getFamilyLength()); - System.arraycopy(kv.getBuffer(), kv.getQualifierOffset(), - newKV.getBuffer(), newKV.getQualifierOffset(), - kv.getQualifierLength()); - - kvs.add(newKV); - - // Append update to WAL - if (writeToWAL) { - if (walEdits == null) { - walEdits = new WALEdit(); - } - walEdits.add(newKV); - } + // Iterate the input columns and update existing values if they were + // found, otherwise add new column initialized to the append value + // Avoid as much copying as possible. Every byte is copied at most + // once. + // Would be nice if KeyValue had scatter/gather logic + List kvs = new ArrayList(family.getValue().size()); + newFamilyMap.put(family.getKey(), kvs); + int idx = 0; + for (KeyValue kv : family.getValue()) { + KeyValue newKV; + if (idx < results.size() + && results.get(idx).matchingQualifier(kv.getBuffer(), + kv.getQualifierOffset(), kv.getQualifierLength())) { + KeyValue oldKv = results.get(idx); + // allocate an empty kv once + newKV = new KeyValue(row.length, kv.getFamilyLength(), + kv.getQualifierLength(), now, KeyValue.Type.Put, + oldKv.getValueLength() + kv.getValueLength()); + // copy in the updated value + System.arraycopy(oldKv.getBuffer(), oldKv.getValueOffset(), + newKV.getBuffer(), newKV.getValueOffset(), + oldKv.getValueLength()); + System.arraycopy(kv.getBuffer(), kv.getValueOffset(), + newKV.getBuffer(), + newKV.getValueOffset() + oldKv.getValueLength(), + kv.getValueLength()); + idx++; + } else { + // allocate an empty kv once + newKV = new KeyValue(row.length, kv.getFamilyLength(), + kv.getQualifierLength(), now, KeyValue.Type.Put, + kv.getValueLength()); + // copy in the new value + System.arraycopy(kv.getBuffer(), kv.getValueOffset(), + newKV.getBuffer(), newKV.getValueOffset(), kv.getValueLength()); } + // copy in row, family, and qualifier + System.arraycopy(kv.getBuffer(), kv.getRowOffset(), + newKV.getBuffer(), newKV.getRowOffset(), kv.getRowLength()); + System.arraycopy(kv.getBuffer(), kv.getFamilyOffset(), + newKV.getBuffer(), newKV.getFamilyOffset(), kv.getFamilyLength()); + System.arraycopy(kv.getBuffer(), kv.getQualifierOffset(), + newKV.getBuffer(), newKV.getQualifierOffset(), + kv.getQualifierLength()); - // Write the KVs for this family into the store - size += store.upsert(kvs); - allKVs.addAll(kvs); - kvs.clear(); + kvs.add(newKV); } + allKVs.addAll(kvs); + } - // Actually write to WAL now - if (writeToWAL) { - // Using default cluster id, as this can only happen in the orginating - // cluster. A slave cluster receives the final value (not the delta) - // as a Put. - txid = this.log.appendNoSync(regionInfo, - this.htableDescriptor.getName(), walEdits, - HConstants.DEFAULT_CLUSTER_ID, now, this.htableDescriptor); - } - - size = this.addAndGetGlobalMemstoreSize(size); - flush = isFlushSize(size); - } finally { - this.updatesLock.readLock().unlock(); - releaseRowLock(lid); - } - if (writeToWAL) { - this.log.sync(txid); // sync the transaction log outside the rowlock - } + // this will unlock the row and the updatesLock.readLock + flush = atomicUpsert(newFamilyMap, lid, now, writeToWAL); } finally { closeRegionOperation(); } @@ -3858,17 +3892,16 @@ public Result increment(Increment increment, Integer lockid, boolean writeToWAL) throws IOException { - // TODO: Use RWCC to make this set of increments atomic to reads byte [] row = increment.getRow(); checkRow(row, "increment"); TimeRange tr = increment.getTimeRange(); boolean flush = false; - WALEdit walEdits = null; + List allKVs = new ArrayList(increment.numColumns()); - List kvs = new ArrayList(increment.numColumns()); + Map> newFamilyMap = new TreeMap>(Bytes.BYTES_COMPARATOR); + long now = EnvironmentEdgeManager.currentTimeMillis(); - long size = 0; - long txid = 0; // Lock row startRegionOperation(); @@ -3876,72 +3909,44 @@ try { Integer lid = getLock(lockid, row, true); this.updatesLock.readLock().lock(); - try { - // Process each family - for (Map.Entry> family : - increment.getFamilyMap().entrySet()) { - Store store = stores.get(family.getKey()); + // Process each family + for (Map.Entry> family : increment + .getFamilyMap().entrySet()) { - // Get previous values for all columns in this family - Get get = new Get(row); - for (Map.Entry column : family.getValue().entrySet()) { - get.addColumn(family.getKey(), column.getKey()); - } - get.setTimeRange(tr.getMin(), tr.getMax()); - List results = getLastIncrement(get); + // Get previous values for all columns in this family + Get get = new Get(row); + for (Map.Entry column : family.getValue().entrySet()) { + get.addColumn(family.getKey(), column.getKey()); + } + get.setTimeRange(tr.getMin(), tr.getMax()); + List results = getLastIncrement(get); - // Iterate the input columns and update existing values if they were - // found, otherwise add new column initialized to the increment amount - int idx = 0; - for (Map.Entry column : family.getValue().entrySet()) { - long amount = column.getValue(); - if (idx < results.size() && - results.get(idx).matchingQualifier(column.getKey())) { - KeyValue kv = results.get(idx); - amount += Bytes.toLong(kv.getBuffer(), kv.getValueOffset()); - idx++; - } - - // Append new incremented KeyValue to list - KeyValue newKV = new KeyValue(row, family.getKey(), column.getKey(), - now, Bytes.toBytes(amount)); - kvs.add(newKV); - - // Append update to WAL - if (writeToWAL) { - if (walEdits == null) { - walEdits = new WALEdit(); - } - walEdits.add(newKV); - } + // Iterate the input columns and update existing values if they were + // found, otherwise add new column initialized to the increment amount + List kvs = new ArrayList(family.getValue().size()); + newFamilyMap.put(family.getKey(), kvs); + int idx = 0; + for (Map.Entry column : family.getValue().entrySet()) { + long amount = column.getValue(); + if (idx < results.size() + && results.get(idx).matchingQualifier(column.getKey())) { + KeyValue kv = results.get(idx); + amount += Bytes.toLong(kv.getBuffer(), kv.getValueOffset()); + idx++; } - // Write the KVs for this family into the store - size += store.upsert(kvs); - allKVs.addAll(kvs); - kvs.clear(); - } + // Append new incremented KeyValue to list + KeyValue newKV = new KeyValue(row, family.getKey(), column.getKey(), + now, Bytes.toBytes(amount)); - // Actually write to WAL now - if (writeToWAL) { - // Using default cluster id, as this can only happen in the orginating - // cluster. A slave cluster receives the final value (not the delta) - // as a Put. - txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(), - walEdits, HConstants.DEFAULT_CLUSTER_ID, now, - this.htableDescriptor); + kvs.add(newKV); } + allKVs.addAll(kvs); + } - size = this.addAndGetGlobalMemstoreSize(size); - flush = isFlushSize(size); - } finally { - this.updatesLock.readLock().unlock(); - releaseRowLock(lid); - } - if (writeToWAL) { - this.log.sync(txid); // sync the transaction log outside the rowlock - } + // this will release the rowlock and updatesLock.readLock + atomicUpsert(newFamilyMap, lid, now, writeToWAL); } finally { closeRegionOperation(); } Index: src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1195022) +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -496,7 +496,6 @@ MonitoredTask status) throws IOException { StoreFile.Writer writer; - String fileName; long flushed = 0; Path pathName; // Don't flush if there are no entries. @@ -1829,24 +1828,20 @@ } /** - * Adds or replaces the specified KeyValues. + * Removes duplicate KVs. *

- * For each KeyValue specified, if a cell with the same row, family, and - * qualifier exists in MemStore, it will be replaced. Otherwise, it will just - * be inserted to MemStore. + * For each KeyValue specified, if a cell with the same row, family, and qualifier, + * but a smaller memstoreTS, it will be removed. *

- * This operation is atomic on each KeyValue (row/family/qualifier) but not - * necessarily atomic across all of them. + * This operation is not atomic accross multiple CFs, but since the KVs are shadowed + * by newer versions it should not matter. * @param kvs * @return memstore size delta - * @throws IOException */ - public long upsert(List kvs) - throws IOException { + public long removeDups(List kvs) { this.lock.readLock().lock(); try { - // TODO: Make this operation atomic w/ RWCC - return this.memstore.upsert(kvs); + return this.memstore.removeDups(kvs); } finally { this.lock.readLock().unlock(); } Index: src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (revision 1195022) +++ src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (working copy) @@ -535,28 +535,42 @@ } /** - * Inserts the specified KeyValue into MemStore and deletes any existing - * versions of the same row/family/qualifier as the specified KeyValue. + * Remove duplicate KeyValues. *

- * First, the specified KeyValue is inserted into the Memstore. + * For each KeyValue if the keyValue does already exist, with a + * memstoreTS < the passed one it will then be removed. *

- * If there are any existing KeyValues in this MemStore with the same row, - * family, and qualifier, they are removed. + * This is called under row lock, so Get operations will still see updates + * atomically. Scans will only see each KeyValue update as atomic. + * + * @param kvs + * @return change in memstore size + */ + public long removeDups(List kvs) { + this.lock.readLock().lock(); + try { + long removedSize = 0; + for (KeyValue kv :kvs) { + removedSize += removeDups(kv, kv.getMemstoreTS()); + } + return removedSize; + } finally { + this.lock.readLock().unlock(); + } + } + + /** + * Remove any duplicate KVs from the memstore. + * A duplicate is a KV with the same row, family, and qualifier, + * but with a memstoreTS < the passed one *

* Callers must hold the read lock. - * * @param kv - * @return change in size of MemStore + * @param memstoreTS + * @return */ - private long upsert(KeyValue kv) { - // Add the KeyValue to the MemStore - // Use the internalAdd method here since we (a) already have a lock - // and (b) cannot safely use the MSLAB here without potentially - // hitting OOME - see TestMemStore.testUpsertMSLAB for a - // test that triggers the pathological case if we don't avoid MSLAB - // here. - long addedSize = internalAdd(kv); - + private long removeDups(KeyValue kv, long memstoreTS) { + long removedSize = 0; // Get the KeyValues for the row/family/qualifier regardless of timestamp. // For this case we want to clean up any other puts KeyValue firstKv = KeyValue.createFirstOnRow( @@ -580,11 +594,11 @@ // if the qualifier matches and it's a put, remove it if (kv.matchingQualifier(cur)) { - // to be extra safe we only remove Puts that have a memstoreTS==0 + // to be extra safe we only remove Puts and check the memstoreTS if (kv.getType() == KeyValue.Type.Put.getCode() && - kv.getMemstoreTS() == 0) { + kv.getMemstoreTS() < memstoreTS) { // false means there was a change, so give us the size. - addedSize -= heapSizeChange(kv, true); + removedSize += heapSizeChange(kv, true); it.remove(); } } else { @@ -592,9 +606,34 @@ break; } } - return addedSize; + return removedSize; } + /** + * Inserts the specified KeyValue into MemStore and deletes any existing + * versions of the same row/family/qualifier as the specified KeyValue. + *

+ * First, the specified KeyValue is inserted into the Memstore. + *

+ * If there are any existing KeyValues in this MemStore with the same row, + * family, and qualifier, they are removed. + *

+ * Callers must hold the read lock. + * + * @param kv + * @return change in size of MemStore + */ + private long upsert(KeyValue kv) { + // Add the KeyValue to the MemStore + // Use the internalAdd method here since we (a) already have a lock + // and (b) cannot safely use the MSLAB here without potentially + // hitting OOME - see TestMemStore.testUpsertMSLAB for a + // test that triggers the pathological case if we don't avoid MSLAB + // here. + // Passing 1 as memstoreTS for backward compatibility. + return internalAdd(kv) - removeDups(kv, 1); + } + /* * Immutable data structure to hold member found in set and the set it was * found in. Include set because it is carrying context.