diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index ac67c96..bda6ce1 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -132,6 +132,10 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor "Work dirs must be specified"); Preconditions.checkArgument(shufflePort == 0 || (shufflePort > 1024 && shufflePort < 65536), "Shuffle Port must be betwee 1024 and 65535, or 0 for automatic selection"); + int outputFormatServicePort = HiveConf.getIntVar(daemonConf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT); + Preconditions.checkArgument(outputFormatServicePort == 0 + || (outputFormatServicePort > 1024 && outputFormatServicePort < 65536), + "OutputFormatService Port must be between 1024 and 65535, or 0 for automatic selection"); String hosts = HiveConf.getTrimmedVar(daemonConf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS); if (hosts.startsWith("@")) { String zkHosts = HiveConf.getTrimmedVar(daemonConf, ConfVars.HIVE_ZOOKEEPER_QUORUM); @@ -165,6 +169,7 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor ", rpcListenerPort=" + srvPort + ", mngListenerPort=" + mngPort + ", webPort=" + webPort + + ", outputFormatSvcPort=" + outputFormatServicePort + ", workDirs=" + Arrays.toString(localDirs) + ", shufflePort=" + shufflePort + ", executorMemory=" + executorMemoryBytes + @@ -336,6 +341,7 @@ public void serviceStart() throws Exception { this.shufflePort.set(ShuffleHandler.get().getPort()); getConfig() .setInt(ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT.varname, ShuffleHandler.get().getPort()); + LlapOutputFormatService.initializeAndStart(getConfig()); super.serviceStart(); // Setup the actual ports in the configuration. diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java index dde5be0..e394191 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java @@ -166,6 +166,7 @@ public void serviceInit(Configuration conf) throws IOException, InterruptedExcep int mngPort = 0; int shufflePort = 0; int webPort = 0; + int outputFormatServicePort = 0; boolean usePortsFromConf = conf.getBoolean("minillap.usePortsFromConf", false); LOG.info("MiniLlap configured to use ports from conf: {}", usePortsFromConf); if (usePortsFromConf) { @@ -173,7 +174,9 @@ public void serviceInit(Configuration conf) throws IOException, InterruptedExcep mngPort = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_MANAGEMENT_RPC_PORT); shufflePort = conf.getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT); webPort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_WEB_PORT); + outputFormatServicePort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT); } + HiveConf.setIntVar(conf, ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT, outputFormatServicePort); if (ownZkCluster) { miniZooKeeperCluster = new MiniZooKeeperCluster(); diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java index 6adbf7c..f852041 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java @@ -25,6 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.BytesWritable; @@ -39,9 +40,13 @@ import org.apache.hadoop.util.Progressable; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.io.api.LlapProxy; + +import com.google.common.base.Preconditions; + import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; @@ -67,9 +72,12 @@ private static final Logger LOG = LoggerFactory.getLogger(LlapOutputFormat.class); - private static LlapOutputFormatService service; + private static final AtomicBoolean started = new AtomicBoolean(false); + private static final AtomicBoolean initing = new AtomicBoolean(false); + private static LlapOutputFormatService INSTANCE; + private final Map writers; - private final HiveConf conf; + private final Configuration conf; private static final int WAIT_TIME = 5; private static final int MAX_QUERY_ID_LENGTH = 256; @@ -78,23 +86,29 @@ private ChannelFuture listeningChannelFuture; private int port; - private LlapOutputFormatService() throws IOException { + private LlapOutputFormatService(Configuration conf) throws IOException { writers = new HashMap(); - conf = new HiveConf(); + this.conf = conf; } - public static LlapOutputFormatService get() throws IOException { - if (service == null) { - service = new LlapOutputFormatService(); - service.start(); + public static void initializeAndStart(Configuration conf) throws Exception { + if (!initing.getAndSet(true)) { + INSTANCE = new LlapOutputFormatService(conf); + INSTANCE.start(); + started.set(true); } - return service; + } + + public static LlapOutputFormatService get() throws IOException { + Preconditions.checkState(started.get(), + "LlapOutputFormatService must be started before invoking get"); + return INSTANCE; } public void start() throws IOException { LOG.info("Starting LlapOutputFormatService"); - int portFromConf = conf.getIntVar(HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT); + int portFromConf = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT); eventLoopGroup = new NioEventLoopGroup(1); serverBootstrap = new ServerBootstrap(); serverBootstrap.group(eventLoopGroup); @@ -125,10 +139,10 @@ public void stop() throws IOException, InterruptedException { public RecordWriter getWriter(String id) throws IOException, InterruptedException { RecordWriter writer = null; - synchronized(service) { + synchronized(INSTANCE) { while ((writer = writers.get(id)) == null) { LOG.info("Waiting for writer for: "+id); - service.wait(); + INSTANCE.wait(); } } LOG.info("Returning writer for: "+id); @@ -147,7 +161,7 @@ public void channelRead0(ChannelHandlerContext ctx, String msg) { } private void registerReader(ChannelHandlerContext ctx, String id) { - synchronized(service) { + synchronized(INSTANCE) { LOG.debug("registering socket for: "+id); int bufSize = 128 * 1024; // configable? OutputStream stream = new ChannelOutputStream(ctx, id, bufSize); @@ -157,7 +171,7 @@ private void registerReader(ChannelHandlerContext ctx, String id) { // Add listener to handle any cleanup for when the connection is closed ctx.channel().closeFuture().addListener(new LlapOutputFormatChannelCloseListener(id)); - service.notifyAll(); + INSTANCE.notifyAll(); } } } @@ -173,7 +187,7 @@ private void registerReader(ChannelHandlerContext ctx, String id) { public void operationComplete(ChannelFuture future) throws Exception { RecordWriter writer = null; - synchronized (service) { + synchronized (INSTANCE) { writer = writers.remove(id); }