Index: modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java (date 1551975508000) +++ modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java (revision ) @@ -294,6 +294,21 @@ assertEquals(2002, (int) Double.parseDouble(result.get(ClusterProperties.IGNITE_MEMORY_OVERHEAD_PER_NODE))); } + + /** + * @throws Exception If failed. + */ + @Test + public void testQueue() throws Exception { + // Default Queue check + Map result = props.toEnvs(); + assertEquals(ClusterProperties.DEFAULT_IGNITE_YARN_QUEUE, result.get(ClusterProperties.IGNITE_YARN_QUEUE)); + + props.yarnQueue("ignite"); + result = props.toEnvs(); + assertEquals("ignite", result.get(ClusterProperties.IGNITE_YARN_QUEUE)); + } + /** * @param host Host. * @param cpu Cpu count. Index: modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java (date 1551975508000) +++ modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java (revision ) @@ -48,7 +48,7 @@ /** */ public static final double DEFAULT_MEM_PER_NODE = 2048; - + /** * The minimum memory overhead: overhead is by default 0.1* MEMORY_PER_NODE, * with a minimum of DEFAULT_MINIMUM_MEM_OVERHEAD_PER_NODE. @@ -69,7 +69,7 @@ /** Memory limit. */ private double memPerNode = DEFAULT_MEM_PER_NODE; - + /** */ public static final String IGNITE_MEMORY_OVERHEAD_PER_NODE = "IGNITE_MEMORY_OVERHEAD_PER_NODE"; @@ -154,6 +154,15 @@ /** Url to ignite config. */ private Pattern hostnameConstraint = null; + /** Ignite Yarn Queue */ + public static final String IGNITE_YARN_QUEUE = "IGNITE_YARN_QUEUE"; + + /** Ignite Yarn default Queue */ + public static final String DEFAULT_IGNITE_YARN_QUEUE = "default"; + + /** Path to users libs. */ + private String yarnQueue = DEFAULT_IGNITE_YARN_QUEUE; + /** */ public ClusterProperties() { // No-op. @@ -193,33 +202,33 @@ * @param mem Memory. */ public void memoryPerNode(double mem) { - this.memPerNode = mem; + this.memPerNode = mem; } /** * @return Memory overhead for requested memory. */ public double memoryOverHeadPerNode() { - return memOverHeadPerNode; - } + return memOverHeadPerNode; + } /** * Sets memory overhead requested to YARN. * * @param memOverHeadPerNode Memory over head per node. */ - public void memoryOverHeadPerNode(double memOverHeadPerNode) { - this.memOverHeadPerNode = memOverHeadPerNode; - } - - /** - * @return Provide the total memory requested to ResourceManagers (memoryPerNode + memoryOverheadPerNode). - */ - public double totalMemoryPerNode(){ - return memoryPerNode() + memoryOverHeadPerNode(); - } + public void memoryOverHeadPerNode(double memOverHeadPerNode) { + this.memOverHeadPerNode = memOverHeadPerNode; + } + + /** + * @return Provide the total memory requested to ResourceManagers (memoryPerNode + memoryOverheadPerNode). + */ + public double totalMemoryPerNode(){ + return memoryPerNode() + memoryOverHeadPerNode(); + } - /** + /** * @return Instance count limit. */ public double instances() { @@ -242,6 +251,15 @@ this.hostnameConstraint = pattern; } + /** + * Sets Yarn Queue + * + * @param queue queue name. + */ + public void yarnQueue(String queue) { + this.yarnQueue = queue; + } + /** * @return Ignite version. */ @@ -312,6 +330,11 @@ return hostnameConstraint; } + /** + * @return Yarn Queue + */ + public String yarnQueue() { return yarnQueue; } + /** * Instantiate a ClusterProperties from a set of properties. * @@ -319,7 +342,7 @@ * @return Cluster properties. */ private static ClusterProperties fromProperties(Properties props) { - ClusterProperties prop = new ClusterProperties(); + ClusterProperties prop = new ClusterProperties(); prop.clusterName = getStringProperty(IGNITE_CLUSTER_NAME, props, DEFAULT_CLUSTER_NAME); @@ -328,7 +351,7 @@ // The minimum memory overhead: overhead is by default 0.1* MEMORY_PER_NODE, // with a minimum of DEFAULT_MINIMUM_MEM_OVERHEAD_PER_NODE prop.memOverHeadPerNode = getDoubleProperty(IGNITE_MEMORY_OVERHEAD_PER_NODE, props, - Math.max( 0.1 * prop.memPerNode, DEFAULT_MINIMUM_MEM_OVERHEAD_PER_NODE)); + Math.max( 0.1 * prop.memPerNode, DEFAULT_MINIMUM_MEM_OVERHEAD_PER_NODE)); prop.nodeCnt = getDoubleProperty(IGNITE_NODE_COUNT, props, DEFAULT_IGNITE_NODE_COUNT); prop.igniteUrl = getStringProperty(IGNITE_URL, props, null); @@ -340,6 +363,7 @@ prop.igniteReleasesDir = getStringProperty(IGNITE_RELEASES_DIR, props, DEFAULT_IGNITE_RELEASES_DIR); prop.igniteCfg = getStringProperty(IGNITE_CONFIG_XML, props, null); prop.userLibs = getStringProperty(IGNITE_USERS_LIBS, props, null); + prop.yarnQueue = getStringProperty(IGNITE_YARN_QUEUE, props, DEFAULT_IGNITE_YARN_QUEUE); String pattern = getStringProperty(IGNITE_HOSTNAME_CONSTRAINT, props, null); @@ -354,7 +378,7 @@ return prop; } - + /** * @param config Path to config file. * @return Cluster configuration. @@ -407,6 +431,7 @@ envs.put(IGNITE_RELEASES_DIR, toEnvVal(igniteReleasesDir)); envs.put(IGNITE_CONFIG_XML, toEnvVal(igniteCfg)); envs.put(IGNITE_USERS_LIBS, toEnvVal(userLibs)); + envs.put(IGNITE_YARN_QUEUE, toEnvVal(yarnQueue)); if (hostnameConstraint != null) envs.put(IGNITE_HOSTNAME_CONSTRAINT, toEnvVal(hostnameConstraint.pattern())); Index: modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java (date 1551975508000) +++ modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java (revision ) @@ -87,17 +87,17 @@ // Upload the jar file to HDFS. Path appJar = IgniteYarnUtils.copyLocalToHdfs(fs, pathAppMasterJar, - props.igniteWorkDir() + File.separator + IgniteYarnUtils.JAR_NAME); + props.igniteWorkDir() + File.separator + IgniteYarnUtils.JAR_NAME); // Set up the container launch context for the application master ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); amContainer.setCommands( - Collections.singletonList( - Environment.JAVA_HOME.$() + "/bin/java -Xmx512m " + ApplicationMaster.class.getName() - + IgniteYarnUtils.SPACE + ignite.toUri() - + IgniteYarnUtils.YARN_LOG_OUT - ) + Collections.singletonList( + Environment.JAVA_HOME.$() + "/bin/java -Xmx512m " + ApplicationMaster.class.getName() + + IgniteYarnUtils.SPACE + ignite.toUri() + + IgniteYarnUtils.YARN_LOG_OUT + ) ); // Setup jar for ApplicationMaster @@ -141,7 +141,7 @@ appContext.setApplicationName("ignition"); // application name appContext.setAMContainerSpec(amContainer); appContext.setResource(capability); - appContext.setQueue("default"); // queue + appContext.setQueue(props.yarnQueue()); // queue // Submit application ApplicationId appId = appContext.getApplicationId(); @@ -154,15 +154,15 @@ YarnApplicationState appState = appReport.getYarnApplicationState(); while (appState == YarnApplicationState.NEW || - appState == YarnApplicationState.NEW_SAVING || - appState == YarnApplicationState.SUBMITTED || - appState == YarnApplicationState.ACCEPTED) { + appState == YarnApplicationState.NEW_SAVING || + appState == YarnApplicationState.SUBMITTED || + appState == YarnApplicationState.ACCEPTED) { TimeUnit.SECONDS.sleep(1L); appReport = yarnClient.getApplicationReport(appId); if (appState != YarnApplicationState.ACCEPTED - && appReport.getYarnApplicationState() == YarnApplicationState.ACCEPTED) + && appReport.getYarnApplicationState() == YarnApplicationState.ACCEPTED) log.log(Level.INFO, "Application {0} is ACCEPTED.", appId); appState = appReport.getYarnApplicationState(); @@ -202,7 +202,7 @@ */ private static void setupAppMasterEnv(Map envs, YarnConfiguration conf) { for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, - YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) + YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) Apps.addToEnvironment(envs, Environment.CLASSPATH.name(), c.trim(), File.pathSeparator);