Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java (revision 1403794) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java (working copy) @@ -108,7 +108,7 @@ public void testIncrementMultiThreads() throws IOException { LOG.info("Starting test testIncrementMultiThreads"); - initHRegion(tableName, getName(), fam1); + initHRegion(tableName, getName(), fam1, fam2); // create 100 threads, each will increment by its own quantity int numThreads = 100; @@ -135,6 +135,8 @@ } } assertICV(row, fam1, qual1, expectedTotal); + assertICV(row, fam1, qual2, expectedTotal*2); + assertICV(row, fam2, qual3, expectedTotal*3); LOG.info("testIncrementMultiThreads successfully verified that total is " + expectedTotal); } @@ -184,18 +186,14 @@ public static class Incrementer extends Thread { private final HRegion region; - private final int threadNumber; private final int numIncrements; private final int amount; - private int count; public Incrementer(HRegion region, int threadNumber, int amount, int numIncrements) { this.region = region; - this.threadNumber = threadNumber; this.numIncrements = numIncrements; - this.count = 0; this.amount = amount; setDaemon(true); } @@ -206,12 +204,18 @@ try { Increment inc = new Increment(row); inc.addColumn(fam1, qual1, amount); - Result result = region.increment(inc, null, true); - // LOG.info("thread:" + threadNumber + " iter:" + i); + inc.addColumn(fam1, qual2, amount*2); + inc.addColumn(fam2, qual3, amount*3); + region.increment(inc, null, true); + + // verify: Make sure we only see completed increments + Get g = new Get(row); + Result result = region.get(g, null); + assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2, Bytes.toLong(result.getValue(fam1, qual2))); + assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*3, Bytes.toLong(result.getValue(fam2, qual3))); } catch (IOException e) { e.printStackTrace(); } - count++; } } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (revision 1403794) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (working copy) @@ -2033,7 +2033,19 @@ return this.region.regionInfo; } - @Override + /** + * Used in tests. TODO: Remove + * + * Updates the value for the given row/family/qualifier. This function will always be seen as + * atomic by other readers because it only puts a single KV to memstore. Thus no read/write + * control necessary. + * @param row row to update + * @param f family to update + * @param qualifier qualifier to update + * @param newValue the new value to set into memstore + * @return memstore size delta + * @throws IOException + */ public long updateColumnValue(byte [] row, byte [] f, byte [] qualifier, long newValue) throws IOException { @@ -2054,11 +2066,10 @@ } @Override - public long upsert(Iterable kvs) throws IOException { + public long upsert(Iterable kvs, long readpoint) throws IOException { this.lock.readLock().lock(); try { - // TODO: Make this operation atomic w/ MVCC - return this.memstore.upsert(kvs); + return this.memstore.upsert(kvs, readpoint); } finally { this.lock.readLock().unlock(); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1403794) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -81,7 +81,6 @@ import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HServerInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.NotServingRegionException; @@ -116,7 +115,7 @@ import org.apache.hadoop.hbase.ipc.RpcCallContext; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.metrics.OperationMetrics; import org.apache.hadoop.hbase.regionserver.metrics.RegionMetricsStorage; @@ -4544,11 +4543,7 @@ // TODO: There's a lot of boiler plate code identical // to increment... See how to better unify that. /** - * * Perform one or more append operations on a row. - *

- * Appends performed are done under row lock but reads do not take locks out - * so this can be seen partially complete by gets and scans. * * @param append * @param lockid @@ -4558,7 +4553,6 @@ */ public Result append(Append append, Integer lockid, boolean writeToWAL) throws IOException { - // TODO: Use MVCC to make this set of appends atomic to reads byte[] row = append.getRow(); checkRow(row, "append"); boolean flush = false; @@ -4575,109 +4569,120 @@ try { Integer lid = getLock(lockid, row, true); this.updatesLock.readLock().lock(); + // wait for all prior MVCC transactions to finish - while we hold the row lock + // (so that we are guaranteed to see the latest state) + WriteEntry w = mvcc.beginMemstoreInsert(); + mvcc.completeMemstoreInsert(w); + // now start my own transaction + w = mvcc.beginMemstoreInsert(); try { - long now = EnvironmentEdgeManager.currentTimeMillis(); - // Process each family - for (Map.Entry> family : append.getFamilyMap() - .entrySet()) { - - Store store = stores.get(family.getKey()); - List kvs = new ArrayList(family.getValue().size()); - - // Get previous values for all columns in this family - Get get = new Get(row); - for (KeyValue kv : family.getValue()) { - get.addColumn(family.getKey(), kv.getQualifier()); - } - List results = get(get, false); - - // Iterate the input columns and update existing values if they were - // found, otherwise add new column initialized to the append value - - // Avoid as much copying as possible. Every byte is copied at most - // once. - // Would be nice if KeyValue had scatter/gather logic - int idx = 0; - for (KeyValue kv : family.getValue()) { - KeyValue newKV; - if (idx < results.size() - && results.get(idx).matchingQualifier(kv.getBuffer(), - kv.getQualifierOffset(), kv.getQualifierLength())) { - KeyValue oldKv = results.get(idx); - // allocate an empty kv once - newKV = new KeyValue(row.length, kv.getFamilyLength(), - kv.getQualifierLength(), now, KeyValue.Type.Put, - oldKv.getValueLength() + kv.getValueLength()); - // copy in the value - System.arraycopy(oldKv.getBuffer(), oldKv.getValueOffset(), - newKV.getBuffer(), newKV.getValueOffset(), - oldKv.getValueLength()); - System.arraycopy(kv.getBuffer(), kv.getValueOffset(), - newKV.getBuffer(), - newKV.getValueOffset() + oldKv.getValueLength(), - kv.getValueLength()); - idx++; - } else { - // allocate an empty kv once - newKV = new KeyValue(row.length, kv.getFamilyLength(), - kv.getQualifierLength(), now, KeyValue.Type.Put, - kv.getValueLength()); - // copy in the value - System.arraycopy(kv.getBuffer(), kv.getValueOffset(), - newKV.getBuffer(), newKV.getValueOffset(), - kv.getValueLength()); + try { + long now = EnvironmentEdgeManager.currentTimeMillis(); + // Process each family + for (Map.Entry> family : append.getFamilyMap() + .entrySet()) { + + Store store = stores.get(family.getKey()); + List kvs = new ArrayList(family.getValue().size()); + + // Get previous values for all columns in this family + Get get = new Get(row); + for (KeyValue kv : family.getValue()) { + get.addColumn(family.getKey(), kv.getQualifier()); } - // copy in row, family, and qualifier - System.arraycopy(kv.getBuffer(), kv.getRowOffset(), - newKV.getBuffer(), newKV.getRowOffset(), kv.getRowLength()); - System.arraycopy(kv.getBuffer(), kv.getFamilyOffset(), - newKV.getBuffer(), newKV.getFamilyOffset(), - kv.getFamilyLength()); - System.arraycopy(kv.getBuffer(), kv.getQualifierOffset(), - newKV.getBuffer(), newKV.getQualifierOffset(), - kv.getQualifierLength()); - - kvs.add(newKV); - - // Append update to WAL - if (writeToWAL) { - if (walEdits == null) { - walEdits = new WALEdit(); + List results = get(get, false); + + // Iterate the input columns and update existing values if they were + // found, otherwise add new column initialized to the append value + + // Avoid as much copying as possible. Every byte is copied at most + // once. + // Would be nice if KeyValue had scatter/gather logic + int idx = 0; + for (KeyValue kv : family.getValue()) { + KeyValue newKV; + if (idx < results.size() + && results.get(idx).matchingQualifier(kv.getBuffer(), + kv.getQualifierOffset(), kv.getQualifierLength())) { + KeyValue oldKv = results.get(idx); + // allocate an empty kv once + newKV = new KeyValue(row.length, kv.getFamilyLength(), + kv.getQualifierLength(), now, KeyValue.Type.Put, + oldKv.getValueLength() + kv.getValueLength()); + // copy in the value + System.arraycopy(oldKv.getBuffer(), oldKv.getValueOffset(), + newKV.getBuffer(), newKV.getValueOffset(), + oldKv.getValueLength()); + System.arraycopy(kv.getBuffer(), kv.getValueOffset(), + newKV.getBuffer(), + newKV.getValueOffset() + oldKv.getValueLength(), + kv.getValueLength()); + idx++; + } else { + // allocate an empty kv once + newKV = new KeyValue(row.length, kv.getFamilyLength(), + kv.getQualifierLength(), now, KeyValue.Type.Put, + kv.getValueLength()); + // copy in the value + System.arraycopy(kv.getBuffer(), kv.getValueOffset(), + newKV.getBuffer(), newKV.getValueOffset(), + kv.getValueLength()); } - walEdits.add(newKV); + // copy in row, family, and qualifier + System.arraycopy(kv.getBuffer(), kv.getRowOffset(), + newKV.getBuffer(), newKV.getRowOffset(), kv.getRowLength()); + System.arraycopy(kv.getBuffer(), kv.getFamilyOffset(), + newKV.getBuffer(), newKV.getFamilyOffset(), + kv.getFamilyLength()); + System.arraycopy(kv.getBuffer(), kv.getQualifierOffset(), + newKV.getBuffer(), newKV.getQualifierOffset(), + kv.getQualifierLength()); + + newKV.setMemstoreTS(w.getWriteNumber()); + kvs.add(newKV); + + // Append update to WAL + if (writeToWAL) { + if (walEdits == null) { + walEdits = new WALEdit(); + } + walEdits.add(newKV); + } } + + //store the kvs to the temporary memstore before writing HLog + tempMemstore.put(store, kvs); } - - //store the kvs to the temporary memstore before writing HLog - tempMemstore.put(store, kvs); + + // Actually write to WAL now + if (writeToWAL) { + // Using default cluster id, as this can only happen in the orginating + // cluster. A slave cluster receives the final value (not the delta) + // as a Put. + txid = this.log.appendNoSync(regionInfo, + this.htableDescriptor.getName(), walEdits, + HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(), + this.htableDescriptor); + } + + //Actually write to Memstore now + for (Map.Entry> entry : tempMemstore.entrySet()) { + Store store = entry.getKey(); + size += store.upsert(entry.getValue(), getSmallestReadPoint()); + allKVs.addAll(entry.getValue()); + } + size = this.addAndGetGlobalMemstoreSize(size); + flush = isFlushSize(size); + } finally { + this.updatesLock.readLock().unlock(); + releaseRowLock(lid); } - - // Actually write to WAL now if (writeToWAL) { - // Using default cluster id, as this can only happen in the orginating - // cluster. A slave cluster receives the final value (not the delta) - // as a Put. - txid = this.log.appendNoSync(regionInfo, - this.htableDescriptor.getName(), walEdits, - HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(), - this.htableDescriptor); + syncOrDefer(txid); // sync the transaction log outside the rowlock } - - //Actually write to Memstore now - for (Map.Entry> entry : tempMemstore.entrySet()) { - Store store = entry.getKey(); - size += store.upsert(entry.getValue()); - allKVs.addAll(entry.getValue()); - } - size = this.addAndGetGlobalMemstoreSize(size); - flush = isFlushSize(size); } finally { - this.updatesLock.readLock().unlock(); - releaseRowLock(lid); + mvcc.completeMemstoreInsert(w); } - if (writeToWAL) { - syncOrDefer(txid); // sync the transaction log outside the rowlock - } } finally { closeRegionOperation(); } @@ -4696,11 +4701,7 @@ } /** - * * Perform one or more increment operations on a row. - *

- * Increments performed are done under row lock but reads do not take locks - * out so this can be seen partially complete by gets and scans. * @param increment * @param lockid * @param writeToWAL @@ -4710,7 +4711,6 @@ public Result increment(Increment increment, Integer lockid, boolean writeToWAL) throws IOException { - // TODO: Use MVCC to make this set of increments atomic to reads byte [] row = increment.getRow(); checkRow(row, "increment"); TimeRange tr = increment.getTimeRange(); @@ -4728,84 +4728,95 @@ try { Integer lid = getLock(lockid, row, true); this.updatesLock.readLock().lock(); + // wait for all prior MVCC transactions to finish - while we hold the row lock + // (so that we are guaranteed to see the latest state) + WriteEntry w = mvcc.beginMemstoreInsert(); + mvcc.completeMemstoreInsert(w); + // now start my own transaction + w = mvcc.beginMemstoreInsert(); try { - long now = EnvironmentEdgeManager.currentTimeMillis(); - // Process each family - for (Map.Entry> family : - increment.getFamilyMap().entrySet()) { - - Store store = stores.get(family.getKey()); - List kvs = new ArrayList(family.getValue().size()); - - // Get previous values for all columns in this family - Get get = new Get(row); - for (Map.Entry column : family.getValue().entrySet()) { - get.addColumn(family.getKey(), column.getKey()); - } - get.setTimeRange(tr.getMin(), tr.getMax()); - List results = get(get, false); - - // Iterate the input columns and update existing values if they were - // found, otherwise add new column initialized to the increment amount - int idx = 0; - for (Map.Entry column : family.getValue().entrySet()) { - long amount = column.getValue(); - if (idx < results.size() && - results.get(idx).matchingQualifier(column.getKey())) { - KeyValue kv = results.get(idx); - if(kv.getValueLength() == Bytes.SIZEOF_LONG) { - amount += Bytes.toLong(kv.getBuffer(), kv.getValueOffset(), Bytes.SIZEOF_LONG); - } else { - // throw DoNotRetryIOException instead of IllegalArgumentException - throw new DoNotRetryIOException( - "Attempted to increment field that isn't 64 bits wide"); - } - idx++; + try { + long now = EnvironmentEdgeManager.currentTimeMillis(); + // Process each family + for (Map.Entry> family : + increment.getFamilyMap().entrySet()) { + + Store store = stores.get(family.getKey()); + List kvs = new ArrayList(family.getValue().size()); + + // Get previous values for all columns in this family + Get get = new Get(row); + for (Map.Entry column : family.getValue().entrySet()) { + get.addColumn(family.getKey(), column.getKey()); } - - // Append new incremented KeyValue to list - KeyValue newKV = new KeyValue(row, family.getKey(), column.getKey(), - now, Bytes.toBytes(amount)); - kvs.add(newKV); - - // Prepare WAL updates - if (writeToWAL) { - if (walEdits == null) { - walEdits = new WALEdit(); + get.setTimeRange(tr.getMin(), tr.getMax()); + List results = get(get, false); + + // Iterate the input columns and update existing values if they were + // found, otherwise add new column initialized to the increment amount + int idx = 0; + for (Map.Entry column : family.getValue().entrySet()) { + long amount = column.getValue(); + if (idx < results.size() && + results.get(idx).matchingQualifier(column.getKey())) { + KeyValue kv = results.get(idx); + if(kv.getValueLength() == Bytes.SIZEOF_LONG) { + amount += Bytes.toLong(kv.getBuffer(), kv.getValueOffset(), Bytes.SIZEOF_LONG); + } else { + // throw DoNotRetryIOException instead of IllegalArgumentException + throw new DoNotRetryIOException( + "Attempted to increment field that isn't 64 bits wide"); + } + idx++; } - walEdits.add(newKV); + + // Append new incremented KeyValue to list + KeyValue newKV = new KeyValue(row, family.getKey(), column.getKey(), + now, Bytes.toBytes(amount)); + newKV.setMemstoreTS(w.getWriteNumber()); + kvs.add(newKV); + + // Prepare WAL updates + if (writeToWAL) { + if (walEdits == null) { + walEdits = new WALEdit(); + } + walEdits.add(newKV); + } } + + //store the kvs to the temporary memstore before writing HLog + tempMemstore.put(store, kvs); } - - //store the kvs to the temporary memstore before writing HLog - tempMemstore.put(store, kvs); + + // Actually write to WAL now + if (writeToWAL) { + // Using default cluster id, as this can only happen in the orginating + // cluster. A slave cluster receives the final value (not the delta) + // as a Put. + txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(), + walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(), + this.htableDescriptor); + } + + //Actually write to Memstore now + for (Map.Entry> entry : tempMemstore.entrySet()) { + Store store = entry.getKey(); + size += store.upsert(entry.getValue(), getSmallestReadPoint()); + allKVs.addAll(entry.getValue()); + } + size = this.addAndGetGlobalMemstoreSize(size); + flush = isFlushSize(size); + } finally { + this.updatesLock.readLock().unlock(); + releaseRowLock(lid); } - - // Actually write to WAL now if (writeToWAL) { - // Using default cluster id, as this can only happen in the orginating - // cluster. A slave cluster receives the final value (not the delta) - // as a Put. - txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(), - walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(), - this.htableDescriptor); + syncOrDefer(txid); // sync the transaction log outside the rowlock } - - //Actually write to Memstore now - for (Map.Entry> entry : tempMemstore.entrySet()) { - Store store = entry.getKey(); - size += store.upsert(entry.getValue()); - allKVs.addAll(entry.getValue()); - } - size = this.addAndGetGlobalMemstoreSize(size); - flush = isFlushSize(size); } finally { - this.updatesLock.readLock().unlock(); - releaseRowLock(lid); + mvcc.completeMemstoreInsert(w); } - if (writeToWAL) { - syncOrDefer(txid); // sync the transaction log outside the rowlock - } } finally { closeRegionOperation(); long after = EnvironmentEdgeManager.currentTimeMillis(); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1403794) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -75,20 +75,6 @@ throws IOException; /** - * Updates the value for the given row/family/qualifier. This function will always be seen as - * atomic by other readers because it only puts a single KV to memstore. Thus no read/write - * control necessary. - * @param row row to update - * @param f family to update - * @param qualifier qualifier to update - * @param newValue the new value to set into memstore - * @return memstore size delta - * @throws IOException - */ - public long updateColumnValue(byte[] row, byte[] f, byte[] qualifier, long newValue) - throws IOException; - - /** * Adds or replaces the specified KeyValues. *

* For each KeyValue specified, if a cell with the same row, family, and qualifier exists in @@ -97,10 +83,11 @@ * This operation is atomic on each KeyValue (row/family/qualifier) but not necessarily atomic * across all of them. * @param kvs + * @param readpoint readpoint below which we can safely remove duplicate KVs * @return memstore size delta * @throws IOException */ - public long upsert(Iterable kvs) throws IOException; + public long upsert(Iterable kvs, long readpoint) throws IOException; /** * Adds a value to the memstore Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (revision 1403794) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (working copy) @@ -435,6 +435,8 @@ } /** + * Only used by tests. TODO: Remove + * * Given the specs of a column, update it, first by inserting a new record, * then removing the old one. Since there is only 1 KeyValue involved, the memstoreTS * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying @@ -449,7 +451,7 @@ * @param now * @return Timestamp */ - public long updateColumnValue(byte[] row, + long updateColumnValue(byte[] row, byte[] family, byte[] qualifier, long newValue, @@ -497,7 +499,7 @@ // create or update (upsert) a new KeyValue with // 'now' and a 0 memstoreTS == immediately visible return upsert(Arrays.asList( - new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue))) + new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue))), 1L ); } finally { this.lock.readLock().unlock(); @@ -519,15 +521,15 @@ * atomically. Scans will only see each KeyValue update as atomic. * * @param kvs + * @param readpoint readpoint below which we can safely remove duplicate KVs * @return change in memstore size */ - public long upsert(Iterable kvs) { + public long upsert(Iterable kvs, long readpoint) { this.lock.readLock().lock(); try { long size = 0; for (KeyValue kv : kvs) { - kv.setMemstoreTS(0); - size += upsert(kv); + size += upsert(kv, readpoint); } return size; } finally { @@ -549,7 +551,7 @@ * @param kv * @return change in size of MemStore */ - private long upsert(KeyValue kv) { + private long upsert(KeyValue kv, long readpoint) { // Add the KeyValue to the MemStore // Use the internalAdd method here since we (a) already have a lock // and (b) cannot safely use the MSLAB here without potentially @@ -566,6 +568,7 @@ kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength()); SortedSet ss = kvset.tailSet(firstKv); Iterator it = ss.iterator(); + int versionsOlderThanReadpoint = 0; while ( it.hasNext() ) { KeyValue cur = it.next(); @@ -573,23 +576,23 @@ // ignore the one just put in continue; } - // if this isn't the row we are interested in, then bail - if (!kv.matchingRow(cur)) { - break; - } + // check that this is the row and column we are interested in, otherwise bail + if (kv.matchingRow(cur) && kv.matchingQualifier(cur)) { + // only remove Puts that concurrent scanners cannot possibly see (readpoint - 1) + if (cur.getType() == KeyValue.Type.Put.getCode() && cur.getMemstoreTS() < readpoint) { + if (versionsOlderThanReadpoint > 1) { + // if we get here we have seen at least one version older than the readpoint, + // which means we can prove that no scanner will ever see any earlier versions - // if the qualifier matches and it's a put, remove it - if (kv.matchingQualifier(cur)) { - - // to be extra safe we only remove Puts that have a memstoreTS==0 - if (kv.getType() == KeyValue.Type.Put.getCode() && - kv.getMemstoreTS() == 0) { - // false means there was a change, so give us the size. - addedSize -= heapSizeChange(kv, true); - it.remove(); + // false means there was a change, so give us the size. + addedSize -= heapSizeChange(cur, true); + it.remove(); + } else { + versionsOlderThanReadpoint++; + } } } else { - // past the column, done + // past the row or column, done break; } }