Index: src/java/org/apache/hadoop/hbase/regionserver/Memcache.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/Memcache.java (revision 673895) +++ src/java/org/apache/hadoop/hbase/regionserver/Memcache.java (working copy) @@ -59,7 +59,7 @@ // The currently active sorted map of edits. private volatile SortedMap memcache = createSynchronizedSortedMap(); - + // Snapshot of memcache. Made for flusher. private volatile SortedMap snapshot = createSynchronizedSortedMap(); @@ -158,11 +158,15 @@ * Write an update * @param key * @param value + * @return memcache size delta */ - void add(final HStoreKey key, final byte[] value) { + long add(final HStoreKey key, final byte[] value) { this.lock.readLock().lock(); try { + byte[] oldValue = this.memcache.remove(key); this.memcache.put(key, value); + return key.getSize() + (value == null ? 0 : value.length) - + (oldValue == null ? 0 : oldValue.length); } finally { this.lock.readLock().unlock(); } Index: src/java/org/apache/hadoop/hbase/regionserver/HStore.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HStore.java (revision 673895) +++ src/java/org/apache/hadoop/hbase/regionserver/HStore.java (working copy) @@ -620,11 +620,12 @@ * * @param key * @param value + * @return memcache size delta */ - protected void add(HStoreKey key, byte[] value) { + protected long add(HStoreKey key, byte[] value) { lock.readLock().lock(); try { - this.memcache.add(key, value); + return this.memcache.add(key, value); } finally { lock.readLock().unlock(); } @@ -726,7 +727,7 @@ now < curkey.getTimestamp() + ttl) { entries++; out.append(curkey, new ImmutableBytesWritable(bytes)); - flushed += HRegion.getEntrySize(curkey, bytes); + flushed += curkey.getSize() + (bytes == null ? 0 : bytes.length); } else { if (LOG.isDebugEnabled()) { LOG.debug("internalFlushCache: " + curkey + @@ -1879,7 +1880,7 @@ public long getSize() { return storeSize; } - + ////////////////////////////////////////////////////////////////////////////// // File administration ////////////////////////////////////////////////////////////////////////////// Index: src/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 673895) +++ src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -504,7 +504,7 @@ long getMinSequenceId() { return this.minSequenceId; } - + /** @return a HRegionInfo object for this region */ public HRegionInfo getRegionInfo() { return this.regionInfo; @@ -1002,6 +1002,9 @@ // to do this for a moment. Its quick. The subsequent sequence id that // goes into the HLog after we've flushed all these snapshots also goes // into the info file that sits beside the flushed files. + // We also set the memcache size to zero here before we allow updates + // again so its value will represent the size of the updates received + // during the flush long sequenceId = -1L; this.updatesLock.writeLock().lock(); try { @@ -1009,6 +1012,7 @@ s.snapshot(); } sequenceId = log.startCacheFlush(); + this.memcacheSize.set(0); } finally { this.updatesLock.writeLock().unlock(); } @@ -1017,20 +1021,13 @@ // restart so hlog content can be replayed and put back into the memcache. // Otherwise, the snapshot content while backed up in the hlog, it will not // be part of the current running servers state. + long flushed = 0; try { // A. Flush memcache to all the HStores. // Keep running vector of all store files that includes both old and the // just-made new flush store file. for (HStore hstore: stores.values()) { - long flushed = hstore.flushCache(sequenceId); - // Subtract amount flushed. - long size = this.memcacheSize.addAndGet(-flushed); - if (size < 0) { - if (LOG.isDebugEnabled()) { - LOG.warn("Memcache size went negative " + size + "; resetting"); - } - this.memcacheSize.set(0); - } + flushed += hstore.flushCache(sequenceId); } } catch (Throwable t) { // An exception here means that the snapshot was not persisted. @@ -1068,7 +1065,7 @@ " in " + (System.currentTimeMillis() - startTime) + "ms, sequence id=" + sequenceId + ", " + - StringUtils.humanReadableInt(this.memcacheSize.get())); + StringUtils.humanReadableInt(flushed)); if (!regionInfo.isMetaRegion()) { this.historian.addRegionFlush(regionInfo, timeTaken); } @@ -1374,7 +1371,7 @@ */ private synchronized void checkResources() { boolean blocked = false; - + while (this.memcacheSize.get() >= this.blockingMemcacheSize) { if (!blocked) { LOG.info("Blocking updates for '" + Thread.currentThread().getName() + @@ -1538,9 +1535,8 @@ long size = 0; for (Map.Entry e: updatesByColumn.entrySet()) { HStoreKey key = e.getKey(); - byte[] val = e.getValue(); - size = this.memcacheSize.addAndGet(getEntrySize(key, val)); - getStore(key.getColumn()).add(key, val); + size = this.memcacheSize.addAndGet( + getStore(key.getColumn()).add(key, e.getValue())); } flush = this.flushListener != null && !this.flushRequested && size > this.memcacheFlushSize; @@ -1578,19 +1574,6 @@ return this.stores.get(HStoreKey.getFamilyMapKey(column)); } - /* - * Calculate size of passed key/value pair. - * Used here when we update region to figure what to add to this.memcacheSize - * Also used in Store when flushing calculating size of flush. Both need to - * use same method making size calculation. - * @param key - * @param value - * @return Size of the passed key + value - */ - static long getEntrySize(final HStoreKey key, byte [] value) { - return key.getSize() + (value == null ? 0 : value.length); - } - ////////////////////////////////////////////////////////////////////////////// // Support code //////////////////////////////////////////////////////////////////////////////