Details
Description
I want Apache Ignite to support priority ordering of Ignite.NET compute jobs on the same node.
Analysis
PriorityQueueCollisionSpi does priority ordering. The problem is the PriorityQueueCollisionSpi expects the user to provide job priorities
via the ComputeTaskSession's "grid.task.priority" attribute and the ComputeTaskSession is not available in Ignite.NET.
It looks like the requirement is to add an injectable ComputeTaskSession in Ignite.NET exposing the SetAttributes operation similar to how it works in Java.
Reproducer
I expect more or less ordered output from the below reproducer. The output may not be completely ordered since completely ordered output requires all the jobs to land on the server node in single batch and this reproducer cannot guarantee that:
>>> Completed job with priority 0 >>> Completed job with priority 9 >>> Completed job with priority 8 >>> Completed job with priority 7 >>> Completed job with priority 6 >>> Completed job with priority 5 >>> Completed job with priority 4 >>> Completed job with priority 3 >>> Completed job with priority 2 >>> Completed job with priority 1
PriorityQueueCollisionSpiTest.cs:
public class PriorityQueueCollisionSpiTest { private static ITestOutputHelper? _output; public PriorityQueueCollisionSpiTest(ITestOutputHelper output) { _output = output; } /// <summary> /// Schedules jobs according to <see cref="IComputeTask{TArg,TJobRes,TRes}"/>'s priority. /// </summary> [Fact] public void SchedulesJobsAccordingToTaskPriority() { // Given an Ignite cluster consisting of server and client nodes using var ignored = Ignition.Start(GetIgniteConfiguration("server1")); var igniteConfiguration = GetIgniteConfiguration("app1"); igniteConfiguration.ClientMode = true; using var ignite = Ignition.Start(igniteConfiguration); var igniteCompute = ignite.GetCompute(); // And the user asynchronously executes multiple tasks, each task starting a job having increasing priority const int jobCount = 10; ICollection<Task> futureResultCollection = new List<Task>(jobCount); for (var priority = 0; priority < jobCount; priority++) { var task = new PriorityTask(priority); var futureResult = igniteCompute.ExecuteAsync(task, jobCount); futureResultCollection.Add(futureResult); } // When all the jobs complete Task.WaitAll(futureResultCollection.ToArray()); // Then the ">>> Completed job with priority" console output demonstrates that the jobs completed in the // decreasing priority order, more or less. } private static IgniteConfiguration GetIgniteConfiguration(string igniteName) => new() { ConsistentId = igniteName, IgniteInstanceName = igniteName, SpringConfigUrl = "ignite-sandbox.xml", DiscoverySpi = new TcpDiscoverySpi { IpFinder = new TcpDiscoveryStaticIpFinder {Endpoints = new List<string> {"127.0.0.1:48700"}}, LocalPort = 48700 }, FailureDetectionTimeout = TimeSpan.FromMinutes(10), ClientFailureDetectionTimeout = TimeSpan.FromMinutes(10), JvmOptions = new List<string> {"-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005"} }; /// <summary> /// <see cref="IComputeTask{TArg,TJobRes,TRes}"/> implementation that single <see cref="IComputeJob{TRes}"/>s with /// the specified priority. /// </summary> [ComputeTaskSessionFullSupport] private sealed class PriorityTask : ComputeTaskSplitAdapter<int, bool, bool> { private readonly int _priority; [TaskSessionResource] private IComputeTaskSession _taskSession; public PriorityTask(int priority) { _priority = priority; } /// <inheritdoc /> public override bool Reduce(IList<IComputeJobResult<bool>> results) => results.All(r => r.Data); /// <inheritdoc /> protected override ICollection<IComputeJob<bool>> Split(int gridSize, int jobCount) { IComputeJob<bool> job = new PriorityJob(_priority); _taskSession.SetAttribute("grid.task.priority", _priority); var actual = _taskSession.GetAttribute<int>("grid.task.priority"); Assert.Equal(_priority, actual); return new List<IComputeJob<bool>> {job}; } } /// <summary> /// <see cref="IComputeJob{TRes}"/> implementation with a priority indicator. /// </summary> private class PriorityJob : ComputeJobAdapter<bool> { private readonly int _priority; public PriorityJob(int priority) { _priority = priority; } /// <inheritdoc /> public override bool Execute() { _output?.WriteLine($">>> Completed job with priority {_priority}"); return true; } } }
ignite-sandbox.xml:
<?xml version="1.0" encoding="utf-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-2.5.xsd" > <bean class="org.apache.ignite.configuration.IgniteConfiguration"> <property name="collisionSpi"> <bean class="org.apache.ignite.spi.collision.priorityqueue.PriorityQueueCollisionSpi"> <property name="parallelJobsNumber" value="1"/> <property name="starvationPreventionEnabled" value="false"/> </bean> </property> </bean> </beans>
Attachments
Issue Links
- causes
-
IGNITE-22586 .NET: Fix build failure due to ComputeTaskSessionTest
- Resolved
- links to