From 95c590623e2d458c63112ea7b30017c5c060ca4a Mon Sep 17 00:00:00 2001 From: stack Date: Wed, 18 May 2016 22:19:12 -0700 Subject: [PATCH] HBASE-12148 Remove TimeRangeTracker as point of contention when many threads writing a Store Let the min and max run independent of each other. Use atomiclongs with CAS rather than synchronize it all. --- .../hbase/regionserver/TimeRangeTracker.java | 87 ++++++++++++++-------- .../hbase/regionserver/TestTimeRangeTracker.java | 14 +++- 2 files changed, 65 insertions(+), 36 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java index f4175cc..82244c3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; @@ -45,9 +46,9 @@ import org.apache.hadoop.io.Writable; @InterfaceAudience.Private public class TimeRangeTracker implements Writable { static final long INITIAL_MIN_TIMESTAMP = Long.MAX_VALUE; - long minimumTimestamp = INITIAL_MIN_TIMESTAMP; static final long INITIAL_MAX_TIMESTAMP = -1; - long maximumTimestamp = INITIAL_MAX_TIMESTAMP; + final AtomicLong minimumTimestamp = new AtomicLong(INITIAL_MIN_TIMESTAMP); + final AtomicLong maximumTimestamp = new AtomicLong(INITIAL_MAX_TIMESTAMP); /** * Default constructor. @@ -68,8 +69,8 @@ public class TimeRangeTracker implements Writable { } private void set(final long min, final long max) { - this.minimumTimestamp = min; - this.maximumTimestamp = max; + this.minimumTimestamp.set(min); + this.maximumTimestamp.set(max); } /** @@ -77,7 +78,7 @@ public class TimeRangeTracker implements Writable { * @return True if we initialized values */ private boolean init(final long l) { - if (this.minimumTimestamp != INITIAL_MIN_TIMESTAMP) return false; + if (getMin() != INITIAL_MIN_TIMESTAMP) return false; set(l, l); return true; } @@ -102,26 +103,47 @@ public class TimeRangeTracker implements Writable { @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MT_CORRECTNESS", justification="Intentional") void includeTimestamp(final long timestamp) { - // Do test outside of synchronization block. Synchronization in here can be problematic - // when many threads writing one Store -- they can all pile up trying to add in here. - // Happens when doing big write upload where we are hammering on one region. - if (timestamp < this.minimumTimestamp) { - synchronized (this) { - if (!init(timestamp)) { - if (timestamp < this.minimumTimestamp) { - this.minimumTimestamp = timestamp; - } - } + if (!setMin(timestamp)) { + setMax(timestamp); + } + } + + private boolean setMin(final long timestamp) { + boolean result = false; + while (true) { + long current = getMin(); + if (timestamp >= current) { + break; + } + if (init(timestamp)) { + result = true; + break; + } + if (this.minimumTimestamp.compareAndSet(current, timestamp)) { + result = true; + break; + } + } + return result; + } + + private boolean setMax(final long timestamp) { + boolean result = false; + while (true) { + long current = getMax(); + if (timestamp <= current) { + break; + } + if (init(timestamp)) { + result = true; + break; } - } else if (timestamp > this.maximumTimestamp) { - synchronized (this) { - if (!init(timestamp)) { - if (this.maximumTimestamp < timestamp) { - this.maximumTimestamp = timestamp; - } - } + if (this.maximumTimestamp.compareAndSet(current, timestamp)) { + result = true; + break; } } + return result; } /** @@ -129,37 +151,36 @@ public class TimeRangeTracker implements Writable { * @param tr TimeRange * @return True if there is overlap, false otherwise */ - public synchronized boolean includesTimeRange(final TimeRange tr) { - return (this.minimumTimestamp < tr.getMax() && this.maximumTimestamp >= tr.getMin()); + public boolean includesTimeRange(final TimeRange tr) { + return getMin() < tr.getMax() && getMax() >= tr.getMin(); } /** * @return the minimumTimestamp */ public synchronized long getMin() { - return minimumTimestamp; + return minimumTimestamp.get(); } /** * @return the maximumTimestamp */ public synchronized long getMax() { - return maximumTimestamp; + return maximumTimestamp.get(); } - public synchronized void write(final DataOutput out) throws IOException { - out.writeLong(minimumTimestamp); - out.writeLong(maximumTimestamp); + public void write(final DataOutput out) throws IOException { + out.writeLong(getMin()); + out.writeLong(getMax()); } - public synchronized void readFields(final DataInput in) throws IOException { - this.minimumTimestamp = in.readLong(); - this.maximumTimestamp = in.readLong(); + public void readFields(final DataInput in) throws IOException { + set(in.readLong(), in.readLong()); } @Override public synchronized String toString() { - return "[" + minimumTimestamp + "," + maximumTimestamp + "]"; + return "[" + getMin() + "," + getMax() + "]"; } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTimeRangeTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTimeRangeTracker.java index b68b1a8..ab797bb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTimeRangeTracker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTimeRangeTracker.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mortbay.log.Log; @Category({SmallTests.class}) public class TestTimeRangeTracker { @@ -125,14 +126,21 @@ public class TestTimeRangeTracker { public static void main(String[] args) throws InterruptedException { long start = System.currentTimeMillis(); final TimeRangeTracker trr = new TimeRangeTracker(); - final int threadCount = 5; - final int calls = 1024 * 1024 * 128; + final int threadCount = 32; + final int calls = 1024 * 1024 * 1024; Thread [] threads = new Thread[threadCount]; for (int i = 0; i < threads.length; i++) { Thread t = new Thread("" + i) { @Override public void run() { - for (int i = 0; i < calls; i++) trr.includeTimestamp(i); + try { + for (int i = 0; i < calls; i++) { + trr.includeTimestamp(i); + // Log.info(getName() + "" + i); + } + } catch (Throwable t) { + Log.info("", t); + } } }; t.start(); -- 2.6.1