diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index f417247..b2e4750 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -65,6 +65,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -287,7 +288,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private Map coprocessorServiceHandlers = Maps.newHashMap(); // Track data size in all memstores - private final MemStoreSizing memStoreSize = new MemStoreSizing(); + private final ThreadSafeMemStoreSizing memStoreSize = new ThreadSafeMemStoreSizing(0, 0, 0); private final RegionServicesForStores regionServicesForStores = new RegionServicesForStores(this); // Debug possible data loss due to WAL off @@ -1192,29 +1193,42 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * store */ public void incMemStoreSize(MemStoreSize memStoreSize) { + long dataSize = memStoreSize.getDataSize(); + long heapSize = memStoreSize.getHeapSize(); + long offheapSize = memStoreSize.getOffHeapSize(); if (this.rsAccounting != null) { - rsAccounting.incGlobalMemStoreSize(memStoreSize); + rsAccounting.incGlobalMemStoreSize(dataSize, heapSize, offheapSize); } - long dataSize; - synchronized (this.memStoreSize) { - this.memStoreSize.incMemStoreSize(memStoreSize); - dataSize = this.memStoreSize.getDataSize(); - } - checkNegativeMemStoreDataSize(dataSize, memStoreSize.getDataSize()); + + long size = this.memStoreSize.incMemStoreDataSize(dataSize); + this.memStoreSize.incMemStoreHeapSize(heapSize); + this.memStoreSize.incMemStoreOffHeapSize(offheapSize); + + checkNegativeMemStoreDataSize(size, dataSize); } public void decrMemStoreSize(MemStoreSize memStoreSize) { + long dataSize = memStoreSize.getDataSize(); + long heapSize = memStoreSize.getHeapSize(); + long offheapSize = memStoreSize.getOffHeapSize(); + + decrMemStoreSize(dataSize, heapSize, offheapSize); + } + + public void decrMemStoreSize(final long dataSize, final long heapSize, final long offheapSize) { if (this.rsAccounting != null) { - rsAccounting.decGlobalMemStoreSize(memStoreSize); + rsAccounting.decGlobalMemStoreSize(dataSize, heapSize, offheapSize); } - long size; - synchronized (this.memStoreSize) { - this.memStoreSize.decMemStoreSize(memStoreSize); - size = this.memStoreSize.getDataSize(); - } - checkNegativeMemStoreDataSize(size, -memStoreSize.getDataSize()); + + // The getDataSize/decMemStoreSize is not locked as the error logging + // is the best effort. + long size = this.memStoreSize.decMemStoreDataSize(dataSize); + this.memStoreSize.decMemStoreHeapSize(heapSize); + this.memStoreSize.decMemStoreOffHeapSize(offheapSize); + checkNegativeMemStoreDataSize(size, -dataSize); } + private void checkNegativeMemStoreDataSize(long memStoreDataSize, long delta) { // This is extremely bad if we make memStoreSize negative. Log as much info on the offending // caller as possible. (memStoreSize might be a negative value already -- freeing memory) @@ -1628,7 +1642,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.closed.set(true); if (!canFlush) { - this.decrMemStoreSize(new MemStoreSize(memStoreSize)); + this.decrMemStoreSize(memStoreSize.getDataSize(), memStoreSize.getHeapSize(), + memStoreSize.getOffHeapSize()); } else if (memStoreSize.getDataSize() != 0) { LOG.error("Memstore data size is " + memStoreSize.getDataSize()); } @@ -4108,8 +4123,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi Path rootDir = FSUtils.getRootDir(conf); Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir); - SnapshotManifest manifest = SnapshotManifest.create(conf, getFilesystem(), - snapshotDir, desc, exnSnare); + SnapshotManifest manifest = SnapshotManifest.create(conf, getFilesystem(), snapshotDir, desc, + exnSnare); manifest.addRegion(this); } @@ -4174,6 +4189,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * We throw RegionTooBusyException if above memstore limit * and expect client to retry using some kind of backoff */ + //TODO: as this is at the critical path. void checkResources() throws RegionTooBusyException { // If catalog region, do not impose resource constraints or block updates. if (this.getRegionInfo().isMetaRegion()) return; @@ -4311,6 +4327,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return size.getHeapSize() + size.getOffHeapSize() > getMemStoreFlushSize(); } + private boolean isFlushSize(final long heapSize, final long offHeapSize) { + return heapSize + offHeapSize > getMemStoreFlushSize(); + } + /** * Read the edits put under this region by wal splitting process. Put * the recovered edits back up into this region. @@ -4600,7 +4620,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi memstoreSize); } incMemStoreSize(memstoreSize); - flush = isFlushSize(this.memStoreSize); + flush = isFlushSize(this.memStoreSize.getHeapSize(), this.memStoreSize.getOffHeapSize()); if (flush) { internalFlushcache(null, currentEditSeqId, stores.values(), status, false, FlushLifeCycleTracker.DUMMY); @@ -8459,7 +8479,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } private void requestFlushIfNeeded() throws RegionTooBusyException { - if(isFlushSize(memStoreSize)) { + if(isFlushSize(memStoreSize.getHeapSize(), memStoreSize.getOffHeapSize())) { requestFlush(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSize.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSize.java index ec79e8d..8fb9b07 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSize.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSize.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.util.concurrent.atomic.LongAdder; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.yetus.audience.InterfaceAudience; @@ -50,47 +51,56 @@ public class MemStoreSize { * be in on heap or off heap area depending on the MSLAB and its configuration to be using on heap * or off heap LABs */ - protected volatile long dataSize; + //protected volatile long dataSize; + protected LongAdder dataSize; /** 'heapSize' tracks all Cell's heap size occupancy. This will include Cell POJO heap overhead. * When Cells in on heap area, this will include the cells data size as well. */ - protected volatile long heapSize; + //protected volatile long heapSize; + protected LongAdder heapSize; /** off-heap size: the aggregated size of all data that is allocated off-heap including all * key-values that reside off-heap and the metadata that resides off-heap */ - protected volatile long offHeapSize; + //protected volatile long offHeapSize; + protected LongAdder offHeapSize; public MemStoreSize() { this(0L, 0L, 0L); } public MemStoreSize(long dataSize, long heapSize, long offHeapSize) { - this.dataSize = dataSize; - this.heapSize = heapSize; - this.offHeapSize = offHeapSize; + this.dataSize = new LongAdder(); + this.heapSize = new LongAdder(); + this.offHeapSize = new LongAdder(); + this.dataSize.add(dataSize); + this.heapSize.add(heapSize); + this.offHeapSize.add(offHeapSize); } protected MemStoreSize(MemStoreSize memStoreSize) { - this.dataSize = memStoreSize.dataSize; - this.heapSize = memStoreSize.heapSize; - this.offHeapSize = memStoreSize.offHeapSize; + this.dataSize = new LongAdder(); + this.heapSize = new LongAdder(); + this.offHeapSize = new LongAdder(); + this.dataSize.add(memStoreSize.getDataSize()); + this.heapSize.add(memStoreSize.getHeapSize()); + this.offHeapSize.add(memStoreSize.getOffHeapSize()); } public boolean isEmpty() { - return this.dataSize == 0 && this.heapSize == 0 && this.offHeapSize == 0; + return this.getDataSize() == 0 && this.getHeapSize() == 0 && this.getOffHeapSize() == 0; } public long getDataSize() { - return this.dataSize; + return this.dataSize.sum(); } public long getHeapSize() { - return this.heapSize; + return this.heapSize.sum(); } public long getOffHeapSize() { - return this.offHeapSize; + return this.offHeapSize.sum(); } @Override @@ -102,23 +112,23 @@ public class MemStoreSize { return false; } MemStoreSize other = (MemStoreSize) obj; - return this.dataSize == other.dataSize - && this.heapSize == other.heapSize - && this.offHeapSize == other.offHeapSize; + return this.getDataSize() == other.getDataSize() + && this.getHeapSize() == other.getHeapSize() + && this.getOffHeapSize() == other.getOffHeapSize(); } @Override public int hashCode() { - long h = 13 * this.dataSize; - h = h + 14 * this.heapSize; - h = h + 15 * this.offHeapSize; + long h = 13 * this.getDataSize(); + h = h + 14 * this.getHeapSize(); + h = h + 15 * this.getOffHeapSize(); return (int) h; } @Override public String toString() { - return "dataSize=" + this.dataSize - + " , heapSize=" + this.heapSize - + " , offHeapSize=" + this.offHeapSize; + return "dataSize=" + this.getDataSize() + + " , heapSize=" + this.getHeapSize() + + " , offHeapSize=" + this.getOffHeapSize(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java index 0b3e925..16cb76c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java @@ -53,9 +53,9 @@ public class MemStoreSizing extends MemStoreSize { } public void incMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) { - this.dataSize += dataSizeDelta; - this.heapSize += heapSizeDelta; - this.offHeapSize += offHeapSizeDelta; + this.dataSize.add(dataSizeDelta); + this.heapSize.add(heapSizeDelta); + this.offHeapSize.add(offHeapSizeDelta); } public void incMemStoreSize(MemStoreSize delta) { @@ -63,9 +63,9 @@ public class MemStoreSizing extends MemStoreSize { } public void decMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) { - this.dataSize -= dataSizeDelta; - this.heapSize -= heapSizeDelta; - this.offHeapSize -= offHeapSizeDelta; + this.dataSize.add((-1L) * dataSizeDelta); + this.heapSize.add((-1L) * heapSizeDelta); + this.offHeapSize.add((-1L) *offHeapSizeDelta); } public void decMemStoreSize(MemStoreSize delta) { @@ -73,9 +73,9 @@ public class MemStoreSizing extends MemStoreSize { } public void empty() { - this.dataSize = 0L; - this.heapSize = 0L; - this.offHeapSize = 0L; + this.dataSize.reset(); + this.heapSize.reset(); + this.offHeapSize.reset(); } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java index 1c627f7..37a388b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java @@ -141,12 +141,27 @@ public class RegionServerAccounting { globalMemStoreOffHeapSize.add(memStoreSize.getOffHeapSize()); } + public void incGlobalMemStoreSize(final long dataSize, final long heapSize, + final long offheapSize) { + globalMemStoreDataSize.add(dataSize); + globalMemStoreHeapSize.add(heapSize); + globalMemStoreOffHeapSize.add(offheapSize); + } + + public void decGlobalMemStoreSize(MemStoreSize memStoreSize) { globalMemStoreDataSize.add(-memStoreSize.getDataSize()); globalMemStoreHeapSize.add(-memStoreSize.getHeapSize()); globalMemStoreOffHeapSize.add(-memStoreSize.getOffHeapSize()); } + public void decGlobalMemStoreSize(final long dataSize, final long heapSize, + final long offheapSize) { + globalMemStoreDataSize.add((-1L) * dataSize); + globalMemStoreHeapSize.add((-1L) * heapSize); + globalMemStoreOffHeapSize.add((-1L) * offheapSize); + } + /** * Return true if we are above the memstore high water mark * @return the flushtype diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java index 70074bf..188782b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java @@ -251,9 +251,7 @@ public abstract class Segment { */ //TODO protected void incSize(long delta, long heapOverhead, long offHeapOverhead) { - synchronized (this) { - this.segmentSize.incMemStoreSize(delta, heapOverhead, offHeapOverhead); - } + this.segmentSize.incMemStoreSize(delta, heapOverhead, offHeapOverhead); } public long getMinSequenceId() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ThreadSafeMemStoreSizing.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ThreadSafeMemStoreSizing.java new file mode 100644 index 0000000..a22825e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ThreadSafeMemStoreSizing.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.concurrent.atomic.AtomicLong; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Accounting of current heap and data sizes. + * Allows read/write on data/heap size as opposed to {@Link MemStoreSize} which is read-only. + * For internal use. + * @see MemStoreSize + */ +@InterfaceAudience.Private +public class ThreadSafeMemStoreSizing { + private final AtomicLong dataSize = new AtomicLong(0); + private final AtomicLong heapSize = new AtomicLong(0); + private final AtomicLong offHeapSize = new AtomicLong(0); + + public ThreadSafeMemStoreSizing(final long dataSize, final long heapSize, + final long offHeapSize) { + this.dataSize.set(dataSize); + this.heapSize.set(heapSize); + this.offHeapSize.set(offHeapSize); + } + + public long sum() { + return dataSize.get() + heapSize.get() + offHeapSize.get(); + } + + public long getDataSize() { + return this.dataSize.get(); + } + + public long getHeapSize() { + return this.heapSize.get(); + } + + public long getOffHeapSize() { + return this.offHeapSize.get(); + } + + public void incMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) { + this.dataSize.addAndGet(dataSizeDelta); + this.heapSize.addAndGet(heapSizeDelta); + this.offHeapSize.addAndGet(offHeapSizeDelta); + } + + public void incMemStoreSize(MemStoreSize delta) { + incMemStoreSize(delta.getDataSize(), delta.getHeapSize(), delta.getOffHeapSize()); + } + + public long incMemStoreDataSize(final long dataSizeDelta) { + return this.dataSize.addAndGet(dataSizeDelta); + } + + public long incMemStoreHeapSize(final long heapSizeDelta) { + return this.heapSize.addAndGet(heapSizeDelta); + } + + public long incMemStoreOffHeapSize(final long offHeapSizeDelta) { + return this.offHeapSize.addAndGet(offHeapSizeDelta); + } + + public void decMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) { + this.dataSize.addAndGet((-1L) * dataSizeDelta); + this.heapSize.addAndGet((-1L) * heapSizeDelta); + this.offHeapSize.addAndGet((-1L) * offHeapSizeDelta); + } + + public void decMemStoreSize(MemStoreSize delta) { + decMemStoreSize(delta.getDataSize(), delta.getHeapSize(), delta.getOffHeapSize()); + } + + public long decMemStoreDataSize(final long dataSizeDelta) { + return this.dataSize.addAndGet(-dataSizeDelta); + } + + public long decMemStoreHeapSize(final long heapSizeDelta) { + return this.heapSize.addAndGet(-heapSizeDelta); + } + + public long decMemStoreOffHeapSize(final long offHeapSizeDelta) { + return this.offHeapSize.addAndGet(-offHeapSizeDelta); + } + +} \ No newline at end of file