Index: modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java (revision b5c7b6f5ed81a9cee242e3abb052c29ec571a9a9) +++ modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java (revision ) @@ -32,6 +32,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.locks.LockSupport; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteLogger; @@ -534,11 +535,24 @@ * Stripe. */ private static class StripeConcurrentQueue extends Stripe { + /** State: active. */ + private static final int STATE_ACTIVE = 0; + + /** State: preparing to park. */ + private static final int STATE_PARKING = 1; + + /** State: parked. */ + private static final int STATE_PARKED = 2; + + /** State updater. */ + private static final AtomicIntegerFieldUpdater STATE_UPD = + AtomicIntegerFieldUpdater.newUpdater(StripeConcurrentQueue.class, "state"); + /** Queue. */ private final Queue queue = new ConcurrentLinkedQueue<>(); - /** */ - private volatile boolean parked; + /** State. */ + private volatile int state = STATE_ACTIVE; /** * @param igniteInstanceName Ignite instance name. @@ -569,32 +583,59 @@ return r; } - parked = true; - - try { - for (;;) { + for (;;) { + // Assume that work item has arrived. + if (Thread.interrupted()) + throw new InterruptedException(); + - r = queue.poll(); + r = queue.poll(); - if (r != null) - return r; + if (r != null) + return r; + // No tasks, mark PARKING state and re-try again. + state = STATE_PARKING; + + r = queue.poll(); + + if (r != null) { + state = STATE_ACTIVE; + + return r; + } + + // Confirm parked state, failure means that new task arrived concurrently. + if (STATE_UPD.compareAndSet(this, STATE_PARKING, STATE_PARKED)) LockSupport.park(); - if (Thread.interrupted()) - throw new InterruptedException(); + // Become active again. + if (state != STATE_ACTIVE) + state = STATE_ACTIVE; - } - } + } + } - finally { - parked = false; - } - } /** {@inheritDoc} */ void execute(Runnable cmd) { queue.add(cmd); - if (parked) + while (true) { + int state0 = state; + + if (state0 == STATE_ACTIVE) + // ACTIVE state guarantees that at least one more poll operation will be performed before park, + // so it is safe to exit. + return; + else { + if (STATE_UPD.compareAndSet(this, state0, STATE_ACTIVE)) { + if (state0 == STATE_PARKED) + // If we performed PARKED -> ACTIVE transition, thread must be unparked. Otherwise + // we performed PARKING -> ACTIVE transition which doesn't require unpark. - LockSupport.unpark(thread); + LockSupport.unpark(thread); + + return; + } + } + } } /** {@inheritDoc} */