Index: modules/core/src/main/java/org/apache/ignite/internal/util/MpscQueue.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/util/MpscQueue.java (revision 69bdb5907ebb7c3afcae78fe648ffc33ad140cb3) +++ modules/core/src/main/java/org/apache/ignite/internal/util/MpscQueue.java (revision 69bdb5907ebb7c3afcae78fe648ffc33ad140cb3) @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.LockSupport; + +/** + * + */ +public final class MpscQueue extends MpscQueue_P1 { + /** Block marker. */ + private static final Node BLOCKED = new Node(null); + + /** Head. */ + protected final AtomicReference headRef = new AtomicReference<>(); + + /** Consumer thread. */ + private volatile Thread consumer; + + /** + * Poll element. + * + * @return Element. + */ + public E poll() { + if (tailSize > 0) + return (E)tail[--tailSize]; + + Node head = headRef.getAndSet(null); + + if (head != null) { + int size = head.size; + + if (tail.length < size) + tail = new Object[Integer.highestOneBit(size) << 1]; + + for (int i = 0; i < size ; i++) { + tail[i] = head.val; + head = head.next; + } + + return (E)tail[tailSize = size - 1]; + } + + return null; + } + + /** + * Take element. + * + * @return Element. + */ + public E take() throws InterruptedException { + if (tailSize > 0) + return (E)tail[--tailSize]; + + AtomicReference headRef = this.headRef; + + for (; ; ) { + Node head = headRef.getAndSet(null); + + if (head != null) { + int size = head.size; + + if (tail.length < size) + tail = new Object[Integer.highestOneBit(size) << 1]; + + for (int i = 0; i < size ; i++) { + tail[i] = head.val; + head = head.next; + } + + return (E)tail[tailSize = size - 1]; + } + + if (headRef.compareAndSet(null, BLOCKED)) { + do { + if (consumer == null) + consumer = Thread.currentThread(); + else + LockSupport.park(); + + if (Thread.interrupted()) + throw new InterruptedException(); + } + while (headRef.get() == BLOCKED); + } + } + } + + /** + * Offer element. + * + * @param e Element. + */ + public void offer(final E e) { + if (e == null) + throw new IllegalArgumentException("Null are not allowed."); + + final Node newItem = new Node(e); + + for (; ; ) { + Node head = headRef.get(); + + if (head == null || head == BLOCKED) { + newItem.next = null; + newItem.size = 1; + } + else { + newItem.next = head; + newItem.size = head.size + 1; + } + + if (headRef.compareAndSet(head, newItem)) { + if (head == BLOCKED) + LockSupport.unpark(consumer); + + break; + } + } + } + + /** + * @return Queue size. + */ + public int size() { + int size = tailSize; + + Node head = headRef.get(); + + if (head != null) + size += head.size; + + return size; + } + + @Override public String toString() { + return "MpscQueue[size=" + size() + ']'; + } +} + +abstract class MpscQueue_P1 extends MpscQueue_P2 { + boolean p000, p001, p002, p003, p004, p005, p006, p007, p008, p009, p010, p011, p012, p013, p014, p015; + boolean p016, p017, p018, p019, p020, p021, p022, p023, p024, p025, p026, p027, p028, p029, p030, p031; + boolean p032, p033, p034, p035, p036, p037, p038, p039, p040, p041, p042, p043, p044, p045, p046, p047; + boolean p048, p049, p050, p051, p052, p053, p054, p055, p056, p057, p058, p059, p060, p061, p062, p063; + boolean p064, p065, p066, p067, p068, p069, p070, p071, p072, p073, p074, p075, p076, p077, p078, p079; + boolean p080, p081, p082, p083, p084, p085, p086, p087, p088, p089, p090, p091, p092, p093, p094, p095; + boolean p096, p097, p098, p099, p100, p101, p102, p103, p104, p105, p106, p107, p108, p109, p110, p111; + boolean p112, p113, p114, p115, p116, p117, p118, p119, p120, p121, p122, p123, p124, p125, p126, p127; + boolean p128, p129, p130, p131, p132, p133, p134, p135, p136, p137, p138, p139, p140, p141, p142, p143; + boolean p144, p145, p146, p147, p148, p149, p150, p151, p152, p153, p154, p155, p156, p157, p158, p159; + boolean p160, p161, p162, p163, p164, p165, p166, p167, p168, p169, p170, p171, p172, p173, p174, p175; + boolean p176, p177, p178, p179, p180, p181, p182, p183, p184, p185, p186, p187, p188, p189, p190, p191; + boolean p192, p193, p194, p195, p196, p197, p198, p199, p200, p201, p202, p203, p204, p205, p206, p207; + boolean p208, p209, p210, p211, p212, p213, p214, p215, p216, p217, p218, p219, p220, p221, p222, p223; + boolean p224, p225, p226, p227, p228, p229, p230, p231, p232, p233, p234, p235, p236, p237, p238, p239; + boolean p240, p241, p242, p243, p244, p245, p246, p247, p248, p249, p250, p251, p252, p253, p254, p255; +} + +abstract class MpscQueue_P2 extends MpscQueue_P3 { + /** Tail. */ + protected Object[] tail = new Object[256]; + /** Tail size. */ + protected int tailSize; +} + +abstract class MpscQueue_P3 { + boolean p000, p001, p002, p003, p004, p005, p006, p007, p008, p009, p010, p011, p012, p013, p014, p015; + boolean p016, p017, p018, p019, p020, p021, p022, p023, p024, p025, p026, p027, p028, p029, p030, p031; + boolean p032, p033, p034, p035, p036, p037, p038, p039, p040, p041, p042, p043, p044, p045, p046, p047; + boolean p048, p049, p050, p051, p052, p053, p054, p055, p056, p057, p058, p059, p060, p061, p062, p063; + boolean p064, p065, p066, p067, p068, p069, p070, p071, p072, p073, p074, p075, p076, p077, p078, p079; + boolean p080, p081, p082, p083, p084, p085, p086, p087, p088, p089, p090, p091, p092, p093, p094, p095; + boolean p096, p097, p098, p099, p100, p101, p102, p103, p104, p105, p106, p107, p108, p109, p110, p111; + boolean p112, p113, p114, p115, p116, p117, p118, p119, p120, p121, p122, p123, p124, p125, p126, p127; + boolean p128, p129, p130, p131, p132, p133, p134, p135, p136, p137, p138, p139, p140, p141, p142, p143; + boolean p144, p145, p146, p147, p148, p149, p150, p151, p152, p153, p154, p155, p156, p157, p158, p159; + boolean p160, p161, p162, p163, p164, p165, p166, p167, p168, p169, p170, p171, p172, p173, p174, p175; + boolean p176, p177, p178, p179, p180, p181, p182, p183, p184, p185, p186, p187, p188, p189, p190, p191; + boolean p192, p193, p194, p195, p196, p197, p198, p199, p200, p201, p202, p203, p204, p205, p206, p207; + boolean p208, p209, p210, p211, p212, p213, p214, p215, p216, p217, p218, p219, p220, p221, p222, p223; + boolean p224, p225, p226, p227, p228, p229, p230, p231, p232, p233, p234, p235, p236, p237, p238, p239; + boolean p240, p241, p242, p243, p244, p245, p246, p247, p248, p249, p250, p251, p252, p253, p254, p255; +} + +final class Node { + /** Value. */ + final Object val; + + /** Next node. */ + Node next; + + /** */ + int size; + + /** + * Constructor. + * + * @param val Value. + */ + Node(Object val) { + this.val = val; + } +} + Index: modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java (revision 7313a327bdfcf1f9891455628fe3b8921c9ad51f) +++ modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java (revision 69bdb5907ebb7c3afcae78fe648ffc33ad140cb3) @@ -21,16 +21,12 @@ import java.util.Collections; import java.util.Deque; import java.util.List; -import java.util.Queue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -38,7 +34,6 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.internal.managers.communication.GridIoPolicy; -import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -54,8 +49,13 @@ * Striped executor. */ public class StripedExecutor implements ExecutorService { + /** */ + private final int IGNITE_TASKS_STEALING_THRESHOLD = + IgniteSystemProperties.getInteger( + IgniteSystemProperties.IGNITE_DATA_STREAMING_EXECUTOR_SERVICE_TASKS_STEALING_THRESHOLD, 4); + /** Stripes. */ - private final Stripe[] stripes; + private final StripeWorker[] stripes; /** Threshold for starvation checks */ private final long threshold; @@ -106,18 +106,17 @@ boolean success = false; - stripes = new Stripe[cnt]; + stripes = new StripeWorker[cnt]; threshold = failureDetectionTimeout; this.log = log; + stealTasks &= IGNITE_TASKS_STEALING_THRESHOLD > 0; + try { - for (int i = 0; i < cnt; i++) { - stripes[i] = stealTasks - ? new StripeConcurrentQueue(igniteInstanceName, poolName, i, log, stripes, errHnd, gridWorkerLsnr) - : new StripeConcurrentQueue(igniteInstanceName, poolName, i, log, errHnd, gridWorkerLsnr); - } + for (int i = 0; i < cnt; i++) + stripes[i] = newWorker(igniteInstanceName, poolName, i, log, errHnd, gridWorkerLsnr, stealTasks); for (int i = 0; i < cnt; i++) stripes[i].start(); @@ -131,15 +130,22 @@ } finally { if (!success) { - for (Stripe stripe : stripes) + for (StripeWorker stripe : stripes) U.cancel(stripe); - for (Stripe stripe : stripes) + for (StripeWorker stripe : stripes) U.join(stripe, log); } } } + private StripeWorker newWorker(String igniteInstanceName, String poolName, int idx, + IgniteLogger log, IgniteInClosure errHnd, GridWorkerListener gridWorkerLsnr, boolean stealTasks) { + return stealTasks + ? new TaskStealingWorker(igniteInstanceName, poolName, idx, log, errHnd, gridWorkerLsnr) + : new SimpleWorker(igniteInstanceName, poolName, idx, log, errHnd, gridWorkerLsnr); + } + /** * Checks starvation in striped pool. Maybe too verbose * but this is needed to faster debug possible issues. @@ -149,7 +155,7 @@ public boolean detectStarvation() { boolean starvationDetected = false; - for (Stripe stripe : stripes) { + for (StripeWorker stripe : stripes) { boolean active = stripe.active; long lastStartedTs = stripe.lastStartedTs; @@ -234,7 +240,7 @@ /** {@inheritDoc} */ @Override public boolean isShutdown() { - for (Stripe stripe : stripes) { + for (StripeWorker stripe : stripes) { if (stripe != null && stripe.isCancelled()) return true; } @@ -244,7 +250,7 @@ /** {@inheritDoc} */ @Override public boolean isTerminated() { - for (Stripe stripe : stripes) { + for (StripeWorker stripe : stripes) { if (stripe.thread.getState() != Thread.State.TERMINATED) return false; } @@ -265,7 +271,7 @@ * Signals all stripes. */ private void signalStop() { - for (Stripe stripe : stripes) + for (StripeWorker stripe : stripes) U.cancel(stripe); } @@ -273,7 +279,7 @@ * Waits for all stripes to stop. */ private void awaitStop() { - for (Stripe stripe : stripes) + for (StripeWorker stripe : stripes) U.join(stripe, log); } @@ -283,7 +289,7 @@ public int queueSize() { int size = 0; - for (Stripe stripe : stripes) + for (StripeWorker stripe : stripes) size += stripe.queueSize(); return size; @@ -305,7 +311,7 @@ public long completedTasks() { long cnt = 0; - for (Stripe stripe : stripes) + for (StripeWorker stripe : stripes) cnt += stripe.completedCnt; return cnt; @@ -466,10 +472,8 @@ return S.toString(StripedExecutor.class, this); } - /** - * Stripe. - */ - private abstract static class Stripe extends GridWorker { + /** */ + private abstract static class StripeWorker extends GridWorker { /** */ private final String igniteInstanceName; @@ -502,7 +506,7 @@ * @param errHnd Exception handler. * @param gridWorkerLsnr listener to link with stripe worker. */ - public Stripe( + public StripeWorker( String igniteInstanceName, String poolName, int idx, @@ -610,47 +614,52 @@ /** {@inheritDoc} */ @Override public String toString() { - return S.toString(Stripe.class, this); + return S.toString(StripeWorker.class, this); } } - - /** - * Stripe. - */ - private static class StripeConcurrentQueue extends Stripe { - /** */ - private static final int IGNITE_TASKS_STEALING_THRESHOLD = - IgniteSystemProperties.getInteger( - IgniteSystemProperties.IGNITE_DATA_STREAMING_EXECUTOR_SERVICE_TASKS_STEALING_THRESHOLD, 4); - - /** Queue. */ - private final Queue queue; - /** */ - @GridToStringExclude - private final Stripe[] others; - + /** */ + private static final class SimpleWorker extends StripeWorker { /** */ - private volatile boolean parked; + private MpscQueue queue = new MpscQueue<>(); /** * @param igniteInstanceName Ignite instance name. * @param poolName Pool name. * @param idx Stripe index. * @param log Logger. - * @param errHnd Critical failure handler. + * @param errHnd Exception handler. * @param gridWorkerLsnr listener to link with stripe worker. */ - StripeConcurrentQueue( - String igniteInstanceName, - String poolName, - int idx, - IgniteLogger log, - IgniteInClosure errHnd, - GridWorkerListener gridWorkerLsnr - ) { - this(igniteInstanceName, poolName, idx, log, null, errHnd, gridWorkerLsnr); + public SimpleWorker(String igniteInstanceName, String poolName, int idx, IgniteLogger log, + IgniteInClosure errHnd, GridWorkerListener gridWorkerLsnr) { + super(igniteInstanceName, poolName, idx, log, errHnd, gridWorkerLsnr); + } + + @Override void execute(Runnable cmd) { + queue.offer(cmd); } + + @Override Runnable take() throws InterruptedException { + return queue.take(); + } + + @Override int queueSize() { + return queue.size(); + } + + @Override String queueToString() { + return queue.toString(); + } + } + + /** */ + private final class TaskStealingWorker extends StripeWorker { + /** Queue. */ + private final Deque queue; + + /** */ + private volatile boolean parked; /** * @param igniteInstanceName Ignite instance name. @@ -660,12 +669,11 @@ * @param errHnd Critical failure handler. * @param gridWorkerLsnr listener to link with stripe worker. */ - StripeConcurrentQueue( + TaskStealingWorker( String igniteInstanceName, String poolName, int idx, IgniteLogger log, - Stripe[] others, IgniteInClosure errHnd, GridWorkerListener gridWorkerLsnr ) { @@ -677,9 +685,7 @@ errHnd, gridWorkerLsnr); - this.others = others; - - this.queue = others == null ? new ConcurrentLinkedQueue() : new ConcurrentLinkedDeque(); + this.queue = new ConcurrentLinkedDeque<>(); } /** {@inheritDoc} */ @@ -687,9 +693,7 @@ Runnable r; for (int i = 0; i < 2048; i++) { - r = queue.poll(); - - if (r != null) + if ((r = queue.poll()) != null) return r; } @@ -697,33 +701,22 @@ try { for (;;) { - r = queue.poll(); - - if (r != null) - return r; + int size, shift = ThreadLocalRandom.current().nextInt(size = stripes.length); - if(others != null) { - int len = others.length; - int init = ThreadLocalRandom.current().nextInt(len); - int cur = init; - - while (true) { - if(cur != idx) { - Deque queue = (Deque) ((StripeConcurrentQueue) others[cur]).queue; + for (int i = 0; i < size; i++) { + Deque queue = ((TaskStealingWorker)stripes[(i + shift) % size]).queue; - if(queue.size() > IGNITE_TASKS_STEALING_THRESHOLD && (r = queue.pollLast()) != null) - return r; - } + if (queue.size() > IGNITE_TASKS_STEALING_THRESHOLD && (r = queue.pollLast()) != null) + return r; + } - if ((cur = (cur + 1) % len) == init) - break; - } - } - LockSupport.park(); if (Thread.interrupted()) throw new InterruptedException(); + + if ((r = queue.poll()) != null) + return r; } } finally { @@ -738,9 +731,13 @@ if (parked) LockSupport.unpark(thread); - if(others != null && queueSize() > IGNITE_TASKS_STEALING_THRESHOLD) { - for (Stripe other : others) { - if(((StripeConcurrentQueue)other).parked) + if(queueSize() > IGNITE_TASKS_STEALING_THRESHOLD) { + int size, shift = ThreadLocalRandom.current().nextInt(size = stripes.length); + + for (int i = 0; i < size; i++) { + TaskStealingWorker other = (TaskStealingWorker)stripes[(i + shift) % size]; + + if (other.parked) LockSupport.unpark(other.thread); } } @@ -758,126 +755,7 @@ /** {@inheritDoc} */ @Override public String toString() { - return S.toString(StripeConcurrentQueue.class, this, super.toString()); - } - } - - /** - * Stripe. - */ - private static class StripeConcurrentQueueNoPark extends Stripe { - /** Queue. */ - private final Queue queue = new ConcurrentLinkedQueue<>(); - - /** - * @param igniteInstanceName Ignite instance name. - * @param poolName Pool name. - * @param idx Stripe index. - * @param log Logger. - * @param errHnd Critical failure handler. - * @param gridWorkerLsnr listener to link with stripe worker. - */ - public StripeConcurrentQueueNoPark( - String igniteInstanceName, - String poolName, - int idx, - IgniteLogger log, - IgniteInClosure errHnd, - GridWorkerListener gridWorkerLsnr - ) { - super(igniteInstanceName, - poolName, - idx, - log, - errHnd, - gridWorkerLsnr); - } - - /** {@inheritDoc} */ - @Override Runnable take() { - for (;;) { - Runnable r = queue.poll(); - - if (r != null) - return r; - } - } - - /** {@inheritDoc} */ - @Override void execute(Runnable cmd) { - queue.add(cmd); - } - - /** {@inheritDoc} */ - @Override int queueSize() { - return queue.size(); - } - - /** {@inheritDoc} */ - @Override String queueToString() { - return String.valueOf(queue); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(StripeConcurrentQueueNoPark.class, this, super.toString()); - } - } - - /** - * Stripe. - */ - private static class StripeConcurrentBlockingQueue extends Stripe { - /** Queue. */ - private final BlockingQueue queue = new LinkedBlockingQueue<>(); - - /** - * @param igniteInstanceName Ignite instance name. - * @param poolName Pool name. - * @param idx Stripe index. - * @param log Logger. - * @param errHnd Critical failure handler. - * @param gridWorkerLsnr listener to link with stripe worker. - */ - public StripeConcurrentBlockingQueue( - String igniteInstanceName, - String poolName, - int idx, - IgniteLogger log, - IgniteInClosure errHnd, - GridWorkerListener gridWorkerLsnr - ) { - super(igniteInstanceName, - poolName, - idx, - log, - errHnd, - gridWorkerLsnr); - } - - /** {@inheritDoc} */ - @Override Runnable take() throws InterruptedException { - return queue.take(); - } - - /** {@inheritDoc} */ - @Override void execute(Runnable cmd) { - queue.add(cmd); - } - - /** {@inheritDoc} */ - @Override int queueSize() { - return queue.size(); - } - - /** {@inheritDoc} */ - @Override String queueToString() { - return String.valueOf(queue); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(StripeConcurrentBlockingQueue.class, this, super.toString()); + return S.toString(TaskStealingWorker.class, this, super.toString()); } } }