diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java index e3946c1..22e65cd 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2317,25 +2317,47 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ boolean flush = false; // Lock row Integer lid = obtainRowLock(row); - long result = 0L; + long result = amount; try { Store store = stores.get(family); - // Determine what to do and perform increment on returned KV, no insertion - Store.ICVResult vas = - store.incrementColumnValue(row, family, qualifier, amount); - // Write incremented value to WAL before inserting + + // Get the old value: + Get get = new Get(row); + get.addColumn(family, qualifier); + List results = new ArrayList(); + NavigableSet qualifiers = new TreeSet(Bytes.BYTES_COMPARATOR); + qualifiers.add(qualifier); + store.get(get, qualifiers, results); + + if (!results.isEmpty()) { + byte [] oldValue = results.get(0).getValue(); + KeyValue kv = results.get(0); + byte [] buffer = kv.getBuffer(); + int valueOffset = kv.getValueOffset(); + result += Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG); + } + + // bulid the KeyValue now: + KeyValue newKv = new KeyValue(row, family, + qualifier, System.currentTimeMillis(), + Bytes.toBytes(result)); + + // now log it: if (writeToWAL) { long now = System.currentTimeMillis(); List edits = new ArrayList(1); - edits.add(vas.kv); + edits.add(newKv); this.log.append(regionInfo.getRegionName(), regionInfo.getTableDesc().getName(), edits, (regionInfo.isMetaRegion() || regionInfo.isRootRegion()), now); } - // Insert to the Store - store.add(vas.kv); - result = vas.value; - long size = this.memstoreSize.addAndGet(vas.sizeAdded); + + // Now request the ICV to the store, this will set the timestamp + // appropriately depending on if there is a value in memcache or not. + // returns the + long size = store.updateColumnValue(row, family, qualifier, result); + + size = this.memstoreSize.addAndGet(size); flush = isFlushSize(size); } finally { releaseRowLock(lid); diff --git a/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 0a4e717..646f2fe 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -496,6 +496,43 @@ public class MemStore implements HeapSize { this.lock.readLock().unlock(); } } + + /** + * Gets from either the memstore or the snapshop, and returns a code + * to let you know which is which. + * + * @param matcher + * @param result + * @return 1 == memstore, 2 == snapshot, 0 == none + */ + int getWithCode(QueryMatcher matcher, List result) throws IOException { + this.lock.readLock().lock(); + try { + boolean fromMemstore = internalGet(this.kvset, matcher, result); + if (fromMemstore || matcher.isDone()) + return 1; + + matcher.update(); + boolean fromSnapshot = internalGet(this.snapshot, matcher, result); + if (fromSnapshot || matcher.isDone()) + return 2; + + return 0; + } finally { + this.lock.readLock().unlock(); + } + } + + /** + * Small utility functions for use by Store.incrementColumnValue + * _only_ under the threat of pain and everlasting race conditions. + */ + void readLockLock() { + this.lock.readLock().lock(); + } + void readLockUnlock() { + this.lock.readLock().unlock(); + } /** * @@ -529,8 +566,8 @@ public class MemStore implements HeapSize { } return false; } - - /* + + /** * MemStoreScanner implements the KeyValueScanner. * It lets the caller scan the contents of a memstore -- both current * map and snapshot. diff --git a/src/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/java/org/apache/hadoop/hbase/regionserver/Store.java index c033758..0422a56 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -416,7 +416,7 @@ public class Store implements HConstants, HeapSize { lock.readLock().unlock(); } } - + /** * Adds a value to the memstore * @@ -1442,34 +1442,19 @@ public class Store implements HConstants, HeapSize { scanner.get(result); } - /* - * Data structure to hold incrementColumnValue result. - */ - static class ICVResult { - final long value; - final long sizeAdded; - final KeyValue kv; - - ICVResult(long value, long sizeAdded, KeyValue kv) { - this.value = value; - this.sizeAdded = sizeAdded; - this.kv = kv; - } - } - /** * Increments the value for the given row/family/qualifier * @param row * @param f * @param qualifier - * @param amount - * @return The new value. + * @param newValue the new value to set into memstore + * @return memstore size delta * @throws IOException */ - public ICVResult incrementColumnValue(byte [] row, byte [] f, - byte [] qualifier, long amount) + public long updateColumnValue(byte [] row, byte [] f, + byte [] qualifier, long newValue) throws IOException { - long value = 0; List result = new ArrayList(); KeyComparator keyComparator = this.comparator.getRawComparator(); + KeyValue kv = null; // Setting up the QueryMatcher Get get = new Get(row); NavigableSet qualifiers = @@ -1481,77 +1466,40 @@ public class Store implements HConstants, HeapSize { QueryMatcher matcher = new QueryMatcher(get, f, qualifiers, this.ttl, keyComparator, 1); - boolean newTs = true; - KeyValue kv = null; - // Read from memstore first: - this.memstore.internalGet(this.memstore.kvset, - matcher, result); - if (!result.isEmpty()) { - kv = result.get(0).clone(); - newTs = false; - } else { - // try the snapshot. - this.memstore.internalGet(this.memstore.snapshot, - matcher, result); - if (!result.isEmpty()) { - kv = result.get(0).clone(); - } - } + // lock memstore snapshot for this critical section: + this.lock.readLock().lock(); + memstore.readLockLock(); + try { + int memstoreCode = this.memstore.getWithCode(matcher, result); - if (kv != null) { - // Received early-out from memstore - // Make a copy of the KV and increment it - byte [] buffer = kv.getBuffer(); - int valueOffset = kv.getValueOffset(); - value = Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG) + amount; - Bytes.putBytes(buffer, valueOffset, Bytes.toBytes(value), 0, - Bytes.SIZEOF_LONG); - if (newTs) { - long currTs = System.currentTimeMillis(); - if (currTs == kv.getTimestamp()) { - currTs++; // just in case something wacky happens. - } - byte [] stampBytes = Bytes.toBytes(currTs); - Bytes.putBytes(buffer, kv.getTimestampOffset(), stampBytes, 0, + if (memstoreCode != 0) { + // was in memstore (or snapshot) + kv = result.get(0).clone(); + byte [] buffer = kv.getBuffer(); + int valueOffset = kv.getValueOffset(); + Bytes.putBytes(buffer, valueOffset, Bytes.toBytes(newValue), 0, Bytes.SIZEOF_LONG); + if (memstoreCode == 2) { + // from snapshot, assign new TS + long currTs = System.currentTimeMillis(); + if (currTs == kv.getTimestamp()) { + currTs++; // unlikely but catastrophic + } + Bytes.putBytes(buffer, kv.getTimestampOffset(), + Bytes.toBytes(currTs), 0, Bytes.SIZEOF_LONG); + } + } else { + kv = new KeyValue(row, f, qualifier, + System.currentTimeMillis(), + Bytes.toBytes(newValue)); } - return new ICVResult(value, 0, kv); - } - // Check if we even have storefiles - if(this.storefiles.isEmpty()) { - return createNewKeyValue(row, f, qualifier, value, amount); - } - - // Get storefiles for this store - List storefileScanners = new ArrayList(); - for (StoreFile sf : this.storefiles.descendingMap().values()) { - Reader r = sf.getReader(); - if (r == null) { - LOG.warn("StoreFile " + sf + " has a null Reader"); - continue; - } - storefileScanners.add(r.getScanner()); - } - - // StoreFileGetScan will handle reading this store's storefiles - StoreFileGetScan scanner = new StoreFileGetScan(storefileScanners, matcher); - - // Run a GET scan and put results into the specified list - scanner.get(result); - if(result.size() > 0) { - value = Bytes.toLong(result.get(0).getValue()); + return add(kv); + // end lock + } finally { + memstore.readLockUnlock(); + this.lock.readLock().unlock(); } - return createNewKeyValue(row, f, qualifier, value, amount); } - private ICVResult createNewKeyValue(byte [] row, byte [] f, - byte [] qualifier, long value, long amount) { - long newValue = value + amount; - KeyValue newKv = new KeyValue(row, f, qualifier, - System.currentTimeMillis(), - Bytes.toBytes(newValue)); - return new ICVResult(newValue, newKv.heapSize(), newKv); - } - public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + (17 * ClassSize.REFERENCE) + (5 * Bytes.SIZEOF_LONG) + (3 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN +