Uploaded image for project: 'Ignite'
  1. Ignite
  2. IGNITE-22349

Ignite.NET support for priority ordering of Compute jobs

    XMLWordPrintableJSON

Details

    • .NET: Add compute task sessions support.
    • Docs Required, Release Notes Required

    Description

      I want Apache Ignite to support priority ordering of Ignite.NET compute jobs on the same node.

      Analysis

      PriorityQueueCollisionSpi does priority ordering. The problem is the PriorityQueueCollisionSpi expects the user to provide job priorities
      via the ComputeTaskSession's "grid.task.priority" attribute and the ComputeTaskSession is not available in Ignite.NET.

      It looks like the requirement is to add an injectable ComputeTaskSession in Ignite.NET exposing the SetAttributes operation similar to how it works in Java.

      Reproducer

      I expect more or less ordered output from the below reproducer. The output may not be completely ordered since completely ordered output requires all the jobs to land on the server node in single batch and this reproducer cannot guarantee that:

      >>> Completed job with priority 0
      >>> Completed job with priority 9
      >>> Completed job with priority 8
      >>> Completed job with priority 7
      >>> Completed job with priority 6
      >>> Completed job with priority 5
      >>> Completed job with priority 4
      >>> Completed job with priority 3
      >>> Completed job with priority 2
      >>> Completed job with priority 1
      

      PriorityQueueCollisionSpiTest.cs:

      public class PriorityQueueCollisionSpiTest
      {
          private static ITestOutputHelper? _output;
      
          public PriorityQueueCollisionSpiTest(ITestOutputHelper output)
          {
              _output = output;
          }
      
          /// <summary>
          /// Schedules jobs according to <see cref="IComputeTask{TArg,TJobRes,TRes}"/>'s priority. 
          /// </summary>
          [Fact]
          public void SchedulesJobsAccordingToTaskPriority()
          {
              // Given an Ignite cluster consisting of server and client nodes
              using var ignored = Ignition.Start(GetIgniteConfiguration("server1"));
              var igniteConfiguration = GetIgniteConfiguration("app1");
              igniteConfiguration.ClientMode = true;
              using var ignite = Ignition.Start(igniteConfiguration);
              var igniteCompute = ignite.GetCompute();
              
              // And the user asynchronously executes multiple tasks, each task starting a job having increasing priority
              const int jobCount = 10;
              ICollection<Task> futureResultCollection = new List<Task>(jobCount);
              for (var priority = 0; priority < jobCount; priority++)
              {
                  var task = new PriorityTask(priority);
                  var futureResult = igniteCompute.ExecuteAsync(task, jobCount);
                  futureResultCollection.Add(futureResult);
              }
      
              // When all the jobs complete
              Task.WaitAll(futureResultCollection.ToArray());
              
              // Then the ">>> Completed job with priority" console output demonstrates that the jobs completed in the
              // decreasing priority order, more or less.
          }
      
          private static IgniteConfiguration GetIgniteConfiguration(string igniteName) =>
              new()
              {
                  ConsistentId = igniteName,
                  IgniteInstanceName = igniteName,
                  SpringConfigUrl = "ignite-sandbox.xml",
                  DiscoverySpi = new TcpDiscoverySpi
                  {
                      IpFinder = new TcpDiscoveryStaticIpFinder {Endpoints = new List<string> {"127.0.0.1:48700"}},
                      LocalPort = 48700
                  },
                  FailureDetectionTimeout = TimeSpan.FromMinutes(10),
                  ClientFailureDetectionTimeout = TimeSpan.FromMinutes(10),
                  JvmOptions = new List<string> {"-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005"}
              };
      
          /// <summary>
          /// <see cref="IComputeTask{TArg,TJobRes,TRes}"/> implementation that single <see cref="IComputeJob{TRes}"/>s with
          /// the specified priority.
          /// </summary>
          [ComputeTaskSessionFullSupport]
          private sealed class PriorityTask : ComputeTaskSplitAdapter<int, bool, bool>
          {
              private readonly int _priority;
              [TaskSessionResource] private IComputeTaskSession _taskSession;
      
              public PriorityTask(int priority)
              {
                  _priority = priority;
              }
      
              /// <inheritdoc />
              public override bool Reduce(IList<IComputeJobResult<bool>> results) => results.All(r => r.Data);
      
              /// <inheritdoc />
              protected override ICollection<IComputeJob<bool>> Split(int gridSize, int jobCount)
              {
                  IComputeJob<bool> job = new PriorityJob(_priority);
                  _taskSession.SetAttribute("grid.task.priority", _priority);
                  var actual = _taskSession.GetAttribute<int>("grid.task.priority");
                  Assert.Equal(_priority, actual);
                  return new List<IComputeJob<bool>> {job};
              }
          }
      
          /// <summary>
          /// <see cref="IComputeJob{TRes}"/> implementation with a priority indicator.
          /// </summary>
          private class PriorityJob : ComputeJobAdapter<bool>
          {
              private readonly int _priority;
      
              public PriorityJob(int priority)
              {
                  _priority = priority;
              }
      
              /// <inheritdoc />
              public override bool Execute()
              {
                  _output?.WriteLine($">>> Completed job with priority {_priority}");
                  return true;
              }
          }
      }
      

      ignite-sandbox.xml:

      <?xml version="1.0" encoding="utf-8"?>
      <beans xmlns="http://www.springframework.org/schema/beans"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xmlns:util="http://www.springframework.org/schema/util"
             xmlns:context="http://www.springframework.org/schema/context"
             xsi:schemaLocation="
              http://www.springframework.org/schema/beans
              http://www.springframework.org/schema/beans/spring-beans.xsd
              http://www.springframework.org/schema/util
              http://www.springframework.org/schema/util/spring-util-2.5.xsd"
      >
          <bean class="org.apache.ignite.configuration.IgniteConfiguration">
              <property name="collisionSpi">
                  <bean class="org.apache.ignite.spi.collision.priorityqueue.PriorityQueueCollisionSpi">
                      <property name="parallelJobsNumber" value="1"/>
                      <property name="starvationPreventionEnabled" value="false"/>
                  </bean>
              </property>
          </bean>
      </beans>
      

      Attachments

        Issue Links

          Activity

            People

              kukushal Alexey Kukushkin
              kukushal Alexey Kukushkin
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 50m
                  50m