Description
When zk session expiry happens, we will reset all the listeners including HelixTaskExecutor (which is a message listener on Helix participant). The reset() will call HelixTaskExecutor#unregisterMessageHandlerFactory() which has the logic that if the executor is not terminated in 200ms, it will throw an exception, which will in turn, skip removing MssageHandlerFactory from handler-factory map.
void unregisterMessageHandlerFactory(String type) { // shutdown executor-service. disconnect if fail ExecutorService executorSvc = _executorMap.remove(type); if (executorSvc != null) { List<Runnable> tasksLeft = executorSvc.shutdownNow(); LOG.info(tasksLeft.size() + " tasks never executed for msgType: " + type + ". tasks: " + tasksLeft); try { if (!executorSvc.awaitTermination(200, TimeUnit.MILLISECONDS)) { LOG.error("executor-service for msgType: " + type + " is not fully terminated in 200ms. will disconnect helix-participant"); throw new HelixException("fail to unregister msg-handler for msgType: " + type); } } catch (InterruptedException e) { LOG.error("interruped when waiting for executor-service shutdown for msgType: " + type, e); } } // reset state-model MessageHandlerFactory handlerFty = _handlerFactoryMap.remove(type); if (handlerFty != null) { handlerFty.reset(); } }
When we re-connect to zk, we re-register message-handler factory, which first checks if the message-handler factory exists and then adds an executor:
public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory, int threadpoolSize) { if (!_handlerFactoryMap.containsKey(type)) { if (!type.equalsIgnoreCase(factory.getMessageType())) { throw new HelixException("Message factory type mismatch. Type: " + type + " factory : " + factory.getMessageType()); } _handlerFactoryMap.put(type, factory); ExecutorService executorSvc = Executors.newFixedThreadPool(threadpoolSize); _executorMap.put(type, executorSvc); LOG.info("Added msg-factory for type: " + type + ", threadpool size " + threadpoolSize); } else { LOG.warn("Fail to register msg-handler-factory for type: " + type + ", pool-size: " + threadpoolSize + ", factory: " + factory); } }
So if we fail to remove message-handler factory, we will fail to register executor, which will lead to NPE when we receive a message:
java.lang.NullPointerException at org.apache.helix.messaging.handling.HelixTaskExecutor.scheduleTask(HelixTaskExecutor.java:243) at org.apache.helix.messaging.handling.HelixTaskExecutor.onMessage(HelixTaskExecutor.java:531) at org.apache.helix.manager.zk.CallbackHandler.invoke(CallbackHandler.java:195) at org.apache.helix.manager.zk.CallbackHandler.handleChildChange(CallbackHandler.java:404) at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)