diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 52990c5f05..5398259130 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -215,7 +215,6 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor ", configured(exec + io if enabled)=" + LlapUtil.humanReadableByteCount(memRequired)); this.shuffleHandlerConf = new Configuration(daemonConf); - this.shuffleHandlerConf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, shufflePort); this.shuffleHandlerConf.set(ShuffleHandler.SHUFFLE_HANDLER_LOCAL_DIRS, StringUtils.arrayToString(localDirs)); this.shuffleHandlerConf.setBoolean(ShuffleHandler.SHUFFLE_DIR_WATCHER_ENABLED, @@ -517,8 +516,8 @@ public static void main(String[] args) throws Exception { new String[0] : StringUtils.getTrimmedStrings(localDirList); int rpcPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_PORT); int mngPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_MANAGEMENT_RPC_PORT); - int shufflePort = daemonConf - .getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT); + int shufflePort = HiveConf.getIntVar(daemonConf, + HiveConf.ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT); int webPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_WEB_PORT); LlapDaemonInfo.initialize(appName, daemonConf); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java index 18a37a2adc..82006fdd9e 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java @@ -58,6 +58,7 @@ import com.google.common.cache.RemovalNotification; import com.google.common.cache.Weigher; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hive.conf.HiveConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -173,9 +174,6 @@ private final ConcurrentMap userRsrc; private JobTokenSecretManager secretManager; - public static final String SHUFFLE_PORT_CONFIG_KEY = "llap.shuffle.port"; - public static final int DEFAULT_SHUFFLE_PORT = 15551; - public static final String SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED = "llap.shuffle.connection-keep-alive.enable"; public static final boolean DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED = true; @@ -345,11 +343,11 @@ public void start() throws Exception { } bootstrap.setPipelineFactory(pipelineFact); bootstrap.setOption("backlog", NetUtil.SOMAXCONN); - port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT); + port = HiveConf.getIntVar(conf, + HiveConf.ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT); Channel ch = bootstrap.bind(new InetSocketAddress(port)); accepted.add(ch); port = ((InetSocketAddress)ch.getLocalAddress()).getPort(); - conf.set(SHUFFLE_PORT_CONFIG_KEY, Integer.toString(port)); pipelineFact.SHUFFLE.setPort(port); if (dirWatcher != null) { dirWatcher.start(); @@ -649,7 +647,8 @@ public AttemptPathInfo load(AttemptPathIdentifier key) throws public Shuffle(Configuration conf) { this.conf = conf; indexCache = new IndexCache(conf); - this.port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT); + this.port = HiveConf.getIntVar(conf, + HiveConf.ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT); } public void setPort(int port) { diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java index 6af230e7b4..83bf42578a 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java @@ -145,7 +145,8 @@ public void serviceInit(Configuration conf) throws IOException, InterruptedExcep if (usePortsFromConf) { rpcPort = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_RPC_PORT); mngPort = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_MANAGEMENT_RPC_PORT); - shufflePort = conf.getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT); + shufflePort = HiveConf.getIntVar(conf, + HiveConf.ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT); webPort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_WEB_PORT); outputFormatServicePort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT); }