From 459bd1f2bd73361addfa9099abf911b3167fe53c Mon Sep 17 00:00:00 2001 From: Matt McCline Date: Fri, 9 Jul 2021 23:09:05 -0700 Subject: [PATCH] Replace Thrift's TThreadPoolServer with source copy TThreadPoolServer_Hive that catches Throwable instead of just an Exception to solve HIVE-25307 in in a differt way in our code line. --- .../cli/thrift/ThriftBinaryCLIService.java | 7 +- .../hadoop/hive/metastore/HiveMetaStore.java | 6 +- .../metastore/TThreadPoolServer_Hive.java | 326 ++++++++++++++++++ 3 files changed, 333 insertions(+), 6 deletions(-) create mode 100644 standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TThreadPoolServer_Hive.java diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java index df2d3a7b71..ffae36c7d9 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.TThreadPoolServer_Hive; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.cli.CLIService; @@ -42,7 +43,7 @@ import org.apache.thrift.server.ServerContext; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TServerEventHandler; -import org.apache.thrift.server.TThreadPoolServer; +// import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; @@ -96,14 +97,14 @@ protected void initServer() { TimeUnit.SECONDS); int beBackoffSlotLength = (int) hiveConf .getTimeVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH, TimeUnit.MILLISECONDS); - TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(serverSocket).processorFactory(processorFactory) + TThreadPoolServer_Hive.Args sargs = new TThreadPoolServer_Hive.Args(serverSocket).processorFactory(processorFactory) .transportFactory(transportFactory).protocolFactory(new TBinaryProtocol.Factory()) .inputProtocolFactory(new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize)) .requestTimeout(requestTimeout).requestTimeoutUnit(TimeUnit.SECONDS).beBackoffSlotLength(beBackoffSlotLength) .beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS).executorService(executorService); // TCP Server - server = new TThreadPoolServer(sargs); + server = new TThreadPoolServer_Hive(sargs); server.setServerEventHandler(new TServerEventHandler() { @Override public ServerContext createContext(TProtocol input, TProtocol output) { diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index cd68cbe0c3..cddcb0b85c 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -176,7 +176,7 @@ import org.apache.thrift.server.ServerContext; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TServerEventHandler; -import org.apache.thrift.server.TThreadPoolServer; +// import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TTransport; @@ -8993,7 +8993,7 @@ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge, // initializes them. openConnections = Metrics.getOrCreateGauge(MetricsConstants.OPEN_CONNECTIONS); - TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverSocket) + TThreadPoolServer_Hive.Args args = new TThreadPoolServer_Hive.Args(serverSocket) .processor(processor) .transportFactory(transFactory) .protocolFactory(protocolFactory) @@ -9001,7 +9001,7 @@ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge, .minWorkerThreads(minWorkerThreads) .maxWorkerThreads(maxWorkerThreads); - TServer tServer = new TThreadPoolServer(args); + TServer tServer = new TThreadPoolServer_Hive(args); TServerEventHandler tServerEventHandler = new TServerEventHandler() { @Override public void preServe() { diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TThreadPoolServer_Hive.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TThreadPoolServer_Hive.java new file mode 100644 index 0000000000..b2bf7451f1 --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TThreadPoolServer_Hive.java @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.thrift.TException; +import org.apache.thrift.TProcessor; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.server.ServerContext; +import org.apache.thrift.server.TServer; +import org.apache.thrift.server.TServerEventHandler; +import org.apache.thrift.transport.TSaslTransportException; +import org.apache.thrift.transport.TServerTransport; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Server which uses Java's built in ThreadPool management to spawn off + * a worker pool that + * + */ +public class TThreadPoolServer_Hive extends TServer { + private static final Logger LOGGER = LoggerFactory.getLogger(TThreadPoolServer_Hive.class.getName()); + + public static class Args extends AbstractServerArgs { + public int minWorkerThreads = 5; + public int maxWorkerThreads = Integer.MAX_VALUE; + public ExecutorService executorService; + public int stopTimeoutVal = 60; + public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS; + public int requestTimeout = 20; + public TimeUnit requestTimeoutUnit = TimeUnit.SECONDS; + public int beBackoffSlotLength = 100; + public TimeUnit beBackoffSlotLengthUnit = TimeUnit.MILLISECONDS; + + public Args(TServerTransport transport) { + super(transport); + } + + public Args minWorkerThreads(int n) { + minWorkerThreads = n; + return this; + } + + public Args maxWorkerThreads(int n) { + maxWorkerThreads = n; + return this; + } + + public Args stopTimeoutVal(int n) { + stopTimeoutVal = n; + return this; + } + + public Args requestTimeout(int n) { + requestTimeout = n; + return this; + } + + public Args requestTimeoutUnit(TimeUnit tu) { + requestTimeoutUnit = tu; + return this; + } + //Binary exponential backoff slot length + public Args beBackoffSlotLength(int n) { + beBackoffSlotLength = n; + return this; + } + + //Binary exponential backoff slot time unit + public Args beBackoffSlotLengthUnit(TimeUnit tu) { + beBackoffSlotLengthUnit = tu; + return this; + } + + public Args executorService(ExecutorService executorService) { + this.executorService = executorService; + return this; + } + } + + // Executor service for handling client connections + private ExecutorService executorService_; + + private final TimeUnit stopTimeoutUnit; + + private final long stopTimeoutVal; + + private final TimeUnit requestTimeoutUnit; + + private final long requestTimeout; + + private final long beBackoffSlotInMillis; + + private Random random = new Random(System.currentTimeMillis()); + + public TThreadPoolServer_Hive(Args args) { + super(args); + + stopTimeoutUnit = args.stopTimeoutUnit; + stopTimeoutVal = args.stopTimeoutVal; + requestTimeoutUnit = args.requestTimeoutUnit; + requestTimeout = args.requestTimeout; + beBackoffSlotInMillis = args.beBackoffSlotLengthUnit.toMillis(args.beBackoffSlotLength); + + executorService_ = args.executorService != null ? + args.executorService : createDefaultExecutorService(args); + } + + private static ExecutorService createDefaultExecutorService(Args args) { + SynchronousQueue executorQueue = + new SynchronousQueue(); + return new ThreadPoolExecutor(args.minWorkerThreads, + args.maxWorkerThreads, + args.stopTimeoutVal, + TimeUnit.SECONDS, + executorQueue); + } + + + public void serve() { + try { + serverTransport_.listen(); + } catch (TTransportException ttx) { + LOGGER.error("Error occurred during listening.", ttx); + return; + } + + // Run the preServe event + if (eventHandler_ != null) { + eventHandler_.preServe(); + } + + stopped_ = false; + setServing(true); + int failureCount = 0; + while (!stopped_) { + try { + TTransport client = serverTransport_.accept(); + WorkerProcess wp = new WorkerProcess(client); + + int retryCount = 0; + long remainTimeInMillis = requestTimeoutUnit.toMillis(requestTimeout); + while(true) { + try { + executorService_.execute(wp); + break; + } catch(Throwable t) { + if (t instanceof RejectedExecutionException) { + retryCount++; + try { + if (remainTimeInMillis > 0) { + //do a truncated 20 binary exponential backoff sleep + long sleepTimeInMillis = ((long) (random.nextDouble() * + (1L << Math.min(retryCount, 20)))) * beBackoffSlotInMillis; + sleepTimeInMillis = Math.min(sleepTimeInMillis, remainTimeInMillis); + TimeUnit.MILLISECONDS.sleep(sleepTimeInMillis); + remainTimeInMillis = remainTimeInMillis - sleepTimeInMillis; + } else { + client.close(); + wp = null; + LOGGER.warn("Task has been rejected by ExecutorService " + retryCount + + " times till timedout, reason: " + t); + break; + } + } catch (InterruptedException e) { + LOGGER.warn("Interrupted while waiting to place client on executor queue."); + Thread.currentThread().interrupt(); + break; + } + } else if (t instanceof Error) { + LOGGER.error("ExecutorService threw error: " + t, t); + throw (Error)t; + } else { + //for other possible runtime errors from ExecutorService, should also not kill serve + LOGGER.warn("ExecutorService threw error: " + t, t); + break; + } + } + } + } catch (TTransportException ttx) { + if (!stopped_) { + ++failureCount; + LOGGER.warn("Transport error occurred during acceptance of message.", ttx); + } + } + } + + executorService_.shutdown(); + + // Loop until awaitTermination finally does return without a interrupted + // exception. If we don't do this, then we'll shut down prematurely. We want + // to let the executorService clear it's task queue, closing client sockets + // appropriately. + long timeoutMS = stopTimeoutUnit.toMillis(stopTimeoutVal); + long now = System.currentTimeMillis(); + while (timeoutMS >= 0) { + try { + executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS); + break; + } catch (InterruptedException ix) { + long newnow = System.currentTimeMillis(); + timeoutMS -= (newnow - now); + now = newnow; + } + } + setServing(false); + } + + public void stop() { + stopped_ = true; + serverTransport_.interrupt(); + } + + private class WorkerProcess implements Runnable { + + /** + * Client that this services. + */ + private TTransport client_; + + /** + * Default constructor. + * + * @param client Transport to process + */ + private WorkerProcess(TTransport client) { + client_ = client; + } + + /** + * Loops on processing a client forever + */ + public void run() { + TProcessor processor = null; + TTransport inputTransport = null; + TTransport outputTransport = null; + TProtocol inputProtocol = null; + TProtocol outputProtocol = null; + + TServerEventHandler eventHandler = null; + ServerContext connectionContext = null; + + try { + processor = processorFactory_.getProcessor(client_); + inputTransport = inputTransportFactory_.getTransport(client_); + outputTransport = outputTransportFactory_.getTransport(client_); + inputProtocol = inputProtocolFactory_.getProtocol(inputTransport); + outputProtocol = outputProtocolFactory_.getProtocol(outputTransport); + + eventHandler = getEventHandler(); + if (eventHandler != null) { + connectionContext = eventHandler.createContext(inputProtocol, outputProtocol); + } + // we check stopped_ first to make sure we're not supposed to be shutting + // down. this is necessary for graceful shutdown. + while (true) { + + if (eventHandler != null) { + eventHandler.processContext(connectionContext, inputTransport, outputTransport); + } + + if(stopped_ || !processor.process(inputProtocol, outputProtocol)) { + break; + } + } + } catch (TSaslTransportException ttx) { + // Something thats not SASL was in the stream, continue silently + } catch (TTransportException ttx) { + // Assume the client died and continue silently + } catch (TException tx) { + LOGGER.error("Thrift error occurred during processing of message.", tx); + } catch (Throwable x) { + // *CHANGE* Replaced Exception with Throwable to catch the RuntimeException that is killing Hive Server 2. + if (x instanceof RuntimeException && x.getCause() != null) { + // Fix the problem where a RuntimeException was thrown because an interface had no throws clause. + Throwable cause = x.getCause(); + LOGGER.debug("Suppressing RuntimeException and using cause " + cause.getClass().getSimpleName() + " instead"); + LOGGER.warn("Error occurred during processing of message.", cause); + } else { + LOGGER.error("Error occurred during processing of message.", x); + } + } finally { + if (eventHandler != null) { + eventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol); + } + if (inputTransport != null) { + inputTransport.close(); + } + if (outputTransport != null) { + outputTransport.close(); + } + if (client_.isOpen()) { + client_.close(); + } + } + } + } +} + -- 2.28.0.windows.1