diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java index f985ef8..ece8112 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2315,25 +2315,47 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ boolean flush = false; // Lock row Integer lid = obtainRowLock(row); - long result = 0L; + long result = amount; try { Store store = stores.get(family); - // Determine what to do and perform increment on returned KV, no insertion - Store.ICVResult vas = - store.incrementColumnValue(row, family, qualifier, amount); - // Write incremented value to WAL before inserting + + // Get the old value: + Get get = new Get(row); + get.addColumn(family, qualifier); + List results = new ArrayList(); + NavigableSet qualifiers = new TreeSet(Bytes.BYTES_COMPARATOR); + qualifiers.add(qualifier); + store.get(get, qualifiers, results); + + if (!results.isEmpty()) { + byte [] oldValue = results.get(0).getValue(); + KeyValue kv = results.get(0); + byte [] buffer = kv.getBuffer(); + int valueOffset = kv.getValueOffset(); + result += Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG); + } + + // bulid the KeyValue now: + KeyValue newKv = new KeyValue(row, family, + qualifier, System.currentTimeMillis(), + Bytes.toBytes(result)); + + // now log it: if (writeToWAL) { long now = System.currentTimeMillis(); List edits = new ArrayList(1); - edits.add(vas.kv); + edits.add(newKv); this.log.append(regionInfo.getRegionName(), regionInfo.getTableDesc().getName(), edits, (regionInfo.isMetaRegion() || regionInfo.isRootRegion()), now); } - // Insert to the Store - store.add(vas.kv); - result = vas.value; - long size = this.memstoreSize.addAndGet(vas.sizeAdded); + + // Now request the ICV to the store, this will set the timestamp + // appropriately depending on if there is a value in memcache or not. + // returns the + long size = store.updateColumnValue(row, family, qualifier, result); + + size = this.memstoreSize.addAndGet(size); flush = isFlushSize(size); } finally { releaseRowLock(lid); diff --git a/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 5f8e81d..76f776e 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -592,6 +592,17 @@ public class MemStore implements HeapSize { this.lock.readLock().unlock(); } } + + /** + * Small utility functions for use by Store.incrementColumnValue + * _only_ under the threat of pain and everlasting race conditions. + */ + void readLockLock() { + this.lock.readLock().lock(); + } + void readLockUnlock() { + this.lock.readLock().unlock(); + } /** * diff --git a/src/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/java/org/apache/hadoop/hbase/regionserver/Store.java index d05d253..e94b8b5 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -1591,37 +1591,22 @@ public class Store implements HConstants, HeapSize { scanner.get(result); } - /* - * Data structure to hold incrementColumnValue result. - */ - static class ICVResult { - final long value; - final long sizeAdded; - final KeyValue kv; - - ICVResult(long value, long sizeAdded, KeyValue kv) { - this.value = value; - this.sizeAdded = sizeAdded; - this.kv = kv; - } - } - /** * Increments the value for the given row/family/qualifier * @param row * @param f * @param qualifier - * @param amount - * @return The new value. + * @param newValue the new value to set into memstore + * @return memstore size delta * @throws IOException */ - public ICVResult incrementColumnValue(byte [] row, byte [] f, - byte [] qualifier, long amount) + public long updateColumnValue(byte [] row, byte [] f, + byte [] qualifier, long newValue) throws IOException { - long value = 0; List result = new ArrayList(); KeyComparator keyComparator = this.comparator.getRawComparator(); + KeyValue kv = null; // Setting up the QueryMatcher Get get = new Get(row); NavigableSet qualifiers = @@ -1630,64 +1615,40 @@ public class Store implements HConstants, HeapSize { QueryMatcher matcher = new QueryMatcher(get, f, qualifiers, this.ttl, keyComparator, 1); - int memstoreCode = this.memstore.getWithCode(matcher, result); - - if (memstoreCode != 0) { - // was in memstore (or snapshot) - KeyValue kv = result.get(0).clone(); - byte [] buffer = kv.getBuffer(); - int valueOffset = kv.getValueOffset(); - value = Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG) + amount; - Bytes.putBytes(buffer, valueOffset, Bytes.toBytes(value), 0, - Bytes.SIZEOF_LONG); - if (memstoreCode == 2) { - // from snapshot, assign new TS - long currTs = System.currentTimeMillis(); - if (currTs == kv.getTimestamp()) { - currTs++; // unlikely but catastrophic + // lock memstore snapshot for this critical section: + memstore.readLockLock(); + try { + int memstoreCode = this.memstore.getWithCode(matcher, result); + + if (memstoreCode != 0) { + // was in memstore (or snapshot) + kv = result.get(0).clone(); + byte [] buffer = kv.getBuffer(); + int valueOffset = kv.getValueOffset(); + Bytes.putBytes(buffer, valueOffset, Bytes.toBytes(newValue), 0, + Bytes.SIZEOF_LONG); + if (memstoreCode == 2) { + // from snapshot, assign new TS + long currTs = System.currentTimeMillis(); + if (currTs == kv.getTimestamp()) { + currTs++; // unlikely but catastrophic + } + Bytes.putBytes(buffer, kv.getTimestampOffset(), + Bytes.toBytes(currTs), 0, Bytes.SIZEOF_LONG); } - Bytes.putBytes(buffer, kv.getTimestampOffset(), - Bytes.toBytes(currTs), 0, Bytes.SIZEOF_LONG); + } else { + kv = new KeyValue(row, f, qualifier, + System.currentTimeMillis(), + Bytes.toBytes(newValue)); } - return new ICVResult(value, 0, kv); + return add(kv); + // end lock + } finally { + memstore.readLockUnlock(); } - // Check if we even have storefiles - if(this.storefiles.isEmpty()) { - return createNewKeyValue(row, f, qualifier, value, amount); - } - - // Get storefiles for this store - List storefileScanners = new ArrayList(); - for (StoreFile sf : this.storefiles.descendingMap().values()) { - Reader r = sf.getReader(); - if (r == null) { - LOG.warn("StoreFile " + sf + " has a null Reader"); - continue; - } - storefileScanners.add(r.getScanner()); - } - - // StoreFileGetScan will handle reading this store's storefiles - StoreFileGetScan scanner = new StoreFileGetScan(storefileScanners, matcher); - - // Run a GET scan and put results into the specified list - scanner.get(result); - if(result.size() > 0) { - value = Bytes.toLong(result.get(0).getValue()); - } - return createNewKeyValue(row, f, qualifier, value, amount); } - private ICVResult createNewKeyValue(byte [] row, byte [] f, - byte [] qualifier, long value, long amount) { - long newValue = value + amount; - KeyValue newKv = new KeyValue(row, f, qualifier, - System.currentTimeMillis(), - Bytes.toBytes(newValue)); - return new ICVResult(newValue, newKv.heapSize(), newKv); - } - public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + (17 * ClassSize.REFERENCE) + (5 * Bytes.SIZEOF_LONG) + (3 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN +