Uploaded image for project: 'Apache Helix'
  1. Apache Helix
  2. HELIX-443

Race condition in Helix register/unregister MessageHandlerFactory

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 0.7.1, 0.6.4
    • None
    • None

    Description

      When zk session expiry happens, we will reset all the listeners including HelixTaskExecutor (which is a message listener on Helix participant). The reset() will call HelixTaskExecutor#unregisterMessageHandlerFactory() which has the logic that if the executor is not terminated in 200ms, it will throw an exception, which will in turn, skip removing MssageHandlerFactory from handler-factory map.

      void unregisterMessageHandlerFactory(String type) {
          // shutdown executor-service. disconnect if fail
          ExecutorService executorSvc = _executorMap.remove(type);
          if (executorSvc != null) {
            List<Runnable> tasksLeft = executorSvc.shutdownNow();
            LOG.info(tasksLeft.size() + " tasks never executed for msgType: " + type + ". tasks: "
                + tasksLeft);
            try {
              if (!executorSvc.awaitTermination(200, TimeUnit.MILLISECONDS)) {
                LOG.error("executor-service for msgType: " + type
                    + " is not fully terminated in 200ms. will disconnect helix-participant");
                throw new HelixException("fail to unregister msg-handler for msgType: " + type);
              }
            } catch (InterruptedException e) {
              LOG.error("interruped when waiting for executor-service shutdown for msgType: " + type, e);
            }
          }
      
          // reset state-model
          MessageHandlerFactory handlerFty = _handlerFactoryMap.remove(type);
          if (handlerFty != null) {
            handlerFty.reset();
          }
        }
      

      When we re-connect to zk, we re-register message-handler factory, which first checks if the message-handler factory exists and then adds an executor:

        public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory,
            int threadpoolSize) {
          if (!_handlerFactoryMap.containsKey(type)) {
            if (!type.equalsIgnoreCase(factory.getMessageType())) {
              throw new HelixException("Message factory type mismatch. Type: " + type + " factory : "
                  + factory.getMessageType());
      
            }
            _handlerFactoryMap.put(type, factory);
            ExecutorService executorSvc = Executors.newFixedThreadPool(threadpoolSize);
            _executorMap.put(type, executorSvc);
      
            LOG.info("Added msg-factory for type: " + type + ", threadpool size " + threadpoolSize);
          } else {
            LOG.warn("Fail to register msg-handler-factory for type: " + type + ", pool-size: "
                + threadpoolSize + ", factory: " + factory);
          }
        }
      

      So if we fail to remove message-handler factory, we will fail to register executor, which will lead to NPE when we receive a message:

      java.lang.NullPointerException
              at org.apache.helix.messaging.handling.HelixTaskExecutor.scheduleTask(HelixTaskExecutor.java:243)
              at org.apache.helix.messaging.handling.HelixTaskExecutor.onMessage(HelixTaskExecutor.java:531)
              at org.apache.helix.manager.zk.CallbackHandler.invoke(CallbackHandler.java:195)
              at org.apache.helix.manager.zk.CallbackHandler.handleChildChange(CallbackHandler.java:404)
              at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
              at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
      

      Attachments

        Activity

          People

            dafu Zhen Zhang
            dafu Zhen Zhang
            Votes:
            0 Vote for this issue
            Watchers:
            2 Stop watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment