Description
Currently,thread number of broadcast-exchange thread pool is fixed and keepAliveSeconds is also fixed as 60s.
object BroadcastExchangeExec { private[execution] val executionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128)) } /** * Create a cached thread pool whose max number of threads is `maxThreadNumber`. Thread names * are formatted as prefix-ID, where ID is a unique, sequentially assigned integer. */ def newDaemonCachedThreadPool( prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): ThreadPoolExecutor = { val threadFactory = namedThreadFactory(prefix) val threadPool = new ThreadPoolExecutor( maxThreadNumber, // corePoolSize: the max number of threads to create before queuing the tasks maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, this one is not used keepAliveSeconds, TimeUnit.SECONDS, new LinkedBlockingQueue[Runnable], threadFactory) threadPool.allowCoreThreadTimeOut(true) threadPool }
But some times, if the Thead object do not GC quickly it may caused server(driver) OOM. In such case,we need to make this thread pool configurable.
Below is an example: