Subject: [PATCH] IGNITE-21047: Sql. Avoid spamming execution tasks when possible --- Index: modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java (revision ccd432f78b0e378d9d4710ff6931adc29e78f748) +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java (date 1706696672472) @@ -28,6 +28,7 @@ import java.util.TimeZone; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import org.apache.calcite.DataContext; @@ -292,6 +293,10 @@ return sharedState; } + private static final int EXEC_QUOTA = 20; + + private int quota = EXEC_QUOTA; + /** * Executes a query task. * @@ -301,19 +306,54 @@ if (isCancelled()) { return; } + assert Thread.currentThread().getName().contains("sql-execution-pool") : + "Unexpected execution thread " + Thread.currentThread().getName(); + + // Let the first N tasks to run in the caller thread. + if (quota > 0) { + quota --; + + try { + task.run(); + } catch (Throwable e) { + onError.accept(e); + } + } else { + quota = EXEC_QUOTA; - executor.execute(qryId, fragmentId(), () -> { - try { - if (!isCancelled()) { - task.run(); - } - } catch (Throwable e) { - onError.accept(e); + executor.execute(qryId, fragmentId(), () -> { + try { + if (!isCancelled()) { + task.run(); + } + } catch (Throwable e) { + onError.accept(e); - throw new IgniteInternalException(INTERNAL_ERR, "Unexpected exception", e); - } - }); + throw new IgniteInternalException(INTERNAL_ERR, "Unexpected exception", e); + } + }); + } } + + /** + * Executes a query task. + * + * @param task Query task. + */ + public void executeScheduled(RunnableX task, Consumer onError) { + executor.execute(qryId, fragmentId(), () -> { + taskScheduled++; + try { + if (!isCancelled()) { + task.run(); + } + } catch (Throwable e) { + onError.accept(e); + + throw new IgniteInternalException(INTERNAL_ERR, "Unexpected exception", e); + } + }); + } /** * Submits a Runnable task for execution and returns a Future representing that task. The Future's {@code get} method will return @@ -326,6 +366,7 @@ assert !isCancelled() : "Call submit after execution was cancelled."; return executor.submit(qryId, fragmentId(), () -> { + taskScheduled++; try { task.run(); } catch (Throwable e) { @@ -349,8 +390,10 @@ public boolean cancel() { boolean res = !cancelFlag.get() && cancelFlag.compareAndSet(false, true); - if (res && LOG.isTraceEnabled()) { - LOG.trace("Context cancelled [qryId={}, fragmentId={}]", qryId, fragmentId()); + if (res) { + if ( LOG.isTraceEnabled()) { + LOG.trace("Context cancelled [qryId={}, fragmentId={}]", qryId, fragmentId()); + } } return res; Index: modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java (revision ccd432f78b0e378d9d4710ff6931adc29e78f748) +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java (date 1706694877285) @@ -221,7 +221,7 @@ inBuff.add(row); if (inBuff.size() == inBufSize) { - context().execute(() -> { + context().executeScheduled(() -> { waiting = 0; push(); }, StorageScanNode.this::onError); @@ -231,7 +231,7 @@ /** {@inheritDoc} */ @Override public void onError(Throwable throwable) { - context().execute(() -> { + context().executeScheduled(() -> { throw throwable; }, StorageScanNode.this::onError); } @@ -239,7 +239,7 @@ /** {@inheritDoc} */ @Override public void onComplete() { - context().execute(() -> { + context().executeScheduled(() -> { activeSubscription = null; waiting = 0; Index: modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java (revision ccd432f78b0e378d9d4710ff6931adc29e78f748) +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java (date 1706694877310) @@ -256,7 +256,7 @@ throw new UnsupportedOperationException(modifyOp.name()); } - modifyResult.whenComplete((r, e) -> context().execute(() -> { + modifyResult.whenComplete((r, e) -> context().executeScheduled(() -> { if (e != null) { onError(e); Index: modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java (revision ccd432f78b0e378d9d4710ff6931adc29e78f748) +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java (date 1706694877283) @@ -117,7 +117,7 @@ requested = rowsCnt; if (!inLoop) { - context().execute(this::doPush, this::onError); + context().executeScheduled(this::doPush, this::onError); } } @@ -144,6 +144,8 @@ /** {@inheritDoc} */ @Override protected void rewindInternal() { +// System.err.println("REWIND " + Thread.currentThread().getName() + " "); +// new Exception().printStackTrace(System.err); remoteSources = null; requested = 0; for (String nodeName : srcNodeNames) { @@ -189,6 +191,7 @@ private void push() throws Exception { if (remoteSources == null) { +// System.err.println("SET REMOTE SOURCES " + Thread.currentThread().getName()); remoteSources = new ArrayList<>(srcNodeNames.size()); for (String nodeName : srcNodeNames) { @@ -282,6 +285,17 @@ inLoop = false; } + if (remoteSources == null) { + context().executeScheduled(this::push, this::onError); + return; + } + +// System.err.println("pushOrdered " + remoteSources + " " + Thread.currentThread().getName()); +// if (remoteSources == null) { +// new Exception().printStackTrace(System.err); +// } + assert remoteSources != null : "No remote sources: " + Thread.currentThread().getName(); + for (RemoteSource remote : remoteSources) { remote.requestNextBatchIfNeeded(); } @@ -356,7 +370,7 @@ ex ); - context().execute(() -> onError(wrapperEx), this::onError); + context().executeScheduled(() -> onError(wrapperEx), this::onError); } }); } @@ -367,9 +381,9 @@ */ public void onNodeLeft(String nodeName) { if (context().originatingNodeName().equals(nodeName) && srcNodeNames == null) { - context().execute(this::close, this::onError); + context().executeScheduled(this::close, this::onError); } else if (srcNodeNames != null && srcNodeNames.contains(nodeName)) { - context().execute(() -> onNodeLeft0(nodeName), this::onError); + context().executeScheduled(() -> onNodeLeft0(nodeName), this::onError); } } Index: modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java (revision ccd432f78b0e378d9d4710ff6931adc29e78f748) +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java (date 1706694877282) @@ -166,7 +166,7 @@ pendingRequests.clear(); } - source.context().execute(() -> { + source.context().executeScheduled(() -> { try { source.close(); @@ -249,7 +249,7 @@ */ private void scheduleTask() { if (!pendingRequests.isEmpty() && taskScheduled.compareAndSet(false, true)) { - source.context().execute(this::flush, source::onError); + source.context().executeScheduled(this::flush, source::onError); } } Index: modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java (revision ccd432f78b0e378d9d4710ff6931adc29e78f748) +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java (date 1706694877313) @@ -785,7 +785,7 @@ // we should complete dependency resolution on the same thread // that is going to be used for fragment execution. ExecutionContext context = createContext(initiatorNode, desc, txAttributes); - Executor exec = (r) -> context.execute(r::run, err -> handleError(err, initiatorNode, desc.fragmentId())); + Executor exec = (r) -> context.executeScheduled(r::run, err -> handleError(err, initiatorNode, desc.fragmentId())); try { IgniteRel treeRoot = relationalTreeFromJsonString(schemaVersion, fragmentString, ctx); Index: modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/QueryTaskExecutorImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/QueryTaskExecutorImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/QueryTaskExecutorImpl.java --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/QueryTaskExecutorImpl.java (revision ccd432f78b0e378d9d4710ff6931adc29e78f748) +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/QueryTaskExecutorImpl.java (date 1706694877281) @@ -20,6 +20,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Predicate; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.thread.LogUncaughtExceptionHandler; @@ -75,10 +76,27 @@ this.exHnd = exHnd; } + static StackTraceElement getCallLocation(Predicate ignoreCaller) { + StackTraceElement[] stackTrace = new Exception().getStackTrace(); + StackTraceElement caller = stackTrace[0]; + + // Skip this method and its first caller. + for (int i = 3; i < stackTrace.length; i++) { + caller = stackTrace[i]; + + if (!ignoreCaller.test(caller)) { + break; + } + } + + return caller; + } + /** {@inheritDoc} */ @Override public void execute(UUID qryId, long fragmentId, Runnable qryTask) { int commandIdx = hash(qryId, fragmentId); +// System.err.println("SUBMIT TASK " + qryId + " " + getCallLocation((s) -> false)); stripedThreadPoolExecutor.execute( () -> { try { Index: modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/RootNode.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/RootNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/RootNode.java --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/RootNode.java (revision ccd432f78b0e378d9d4710ff6931adc29e78f748) +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/RootNode.java (date 1706694877284) @@ -130,7 +130,7 @@ /** {@inheritDoc} */ @Override public void closeInternal() { - context().execute(() -> sources().forEach(Commons::closeQuiet), this::onError); + context().executeScheduled(() -> sources().forEach(Commons::closeQuiet), this::onError); } /** {@inheritDoc} */ @@ -255,7 +255,7 @@ close(); } else if (inBuff.isEmpty() && waiting == 0) { int req = waiting = inBufSize; - context().execute(() -> source().request(req), this::onError); + context().executeScheduled(() -> source().request(req), this::onError); } if (!outBuff.isEmpty() || waiting == -1) { Index: modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java (revision ccd432f78b0e378d9d4710ff6931adc29e78f748) +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java (date 1706694877284) @@ -265,7 +265,7 @@ ex ); - context().execute(() -> onError(wrapperEx), this::onError); + context().executeScheduled(() -> onError(wrapperEx), this::onError); }); } @@ -344,7 +344,7 @@ */ public void onNodeLeft(String nodeName) { if (nodeName.equals(context().originatingNodeName())) { - context().execute(this::close, this::onError); + context().executeScheduled(this::close, this::onError); } }