Uploaded image for project: 'Geode'
  1. Geode
  2. GEODE-9538

NullPointerException in ServerConnection.doNormalMessage()

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • membership
    • None

    Description

      I've hit this issue while executing some chaos testing over a GemFire cluster using 2 locators and 3 servers; SSL is enabled and a dummy SecurityManager is configured to authenticate and authorize a pre-configured set of well known users.
      There are 3 PARTITION_REDUNDANT regions configured, one per client, each with 1 redundant copy. Once the cluster is up and running, 3 clients continuously execute Region.get and Region.put operations on a known set of keys for its own Region (created with PROXY type), and another process executes the following logic in parallel (pseudocode):

      for server in ${servers}
      do
      	# Pause the JVM for 30 seconds to simulate a stop the world GC
      	kill -STOP server 
      	sleep 30
      
      	# Unpause the JVM, wait for member to reconnect and regions to recover redundancy configured
      	kill -CONT "${SERVER_PID}"
      	waitForReconnectcompletedInServerLog
      	waitForNumBucketsWithoutRedundancyToBeZeroInGfshShowRegionMetrics
      done
      

      The test works fine most of the time, but randomly fails due to an unexpected exception logged within the logs of at least one server. The exception is always reported from a ServerConnection thread on the server member that has just returned to life, as an example:

      [info 2021/08/09 11:01:07.430 GMT system-test-gemfire-server-2 <Pooled Waiting Message Processor 11> tid=0x8d] Configured redundancy of 2 copies has been restored to /system-test-client-7f6795dfb8-v7hh8-region
      
      [warn 2021/08/09 11:02:19.742 GMT system-test-gemfire-server-2 <ClientHealthMonitor Thread> tid=0x4d] Server connection from [identity(system-test-client-7f6795dfb8-pc8mv(SpringBasedClientCacheApplication:1:loner):34788:814b8d2a:SpringBasedClientCacheApplication,connection=1; port=50264] is being terminated because its client timeout of 10000 has expired.
      
      [warn 2021/08/09 11:02:19.744 GMT system-test-gemfire-server-2 <ClientHealthMonitor Thread> tid=0x4d] ClientHealthMonitor: Unregistering client with member id identity(system-test-client-7f6795dfb8-pc8mv(SpringBasedClientCacheApplication:1:loner):34788:814b8d2a:SpringBasedClientCacheApplication,connection=1 due to: Unknown reason
      
      [info 2021/08/09 11:02:19.745 GMT system-test-gemfire-server-2 <unicast receiver,system-test-gemfire-server-2-56622> tid=0x1e] received suspect message from system-test-gemfire-locator-0(system-test-gemfire-locator-0:1:locator)<ec><v1>:41000 for system-test-gemfire-server-2(system-test-gemfire-server-2:1)<v3>:41000: Member isn't responding to heartbeat requests
      
      [info 2021/08/09 11:02:19.747 GMT system-test-gemfire-server-2 <unicast receiver,system-test-gemfire-server-2-56622> tid=0x1e] Membership received a request to remove system-test-gemfire-server-2(system-test-gemfire-server-2:1)<v3>:41000 from system-test-gemfire-locator-1(system-test-gemfire-locator-1:1:locator)<ec><v0>:41000 reason=Member isn't responding to heartbeat requests
      
      [warn 2021/08/09 11:02:19.748 GMT system-test-gemfire-server-2 <StatSampler> tid=0x38] Statistics sampling thread detected a wakeup delay of 29965 ms, indicating a possible resource issue. Check the GC, memory, and CPU statistics.
      
      ...
      
      [warn 2021/08/09 11:02:19.854 GMT system-test-gemfire-server-2 <ServerConnection on port 40404 Thread 9> tid=0x91] ClientHealthMonitor: Unregistering client with member id identity(system-test-client-7f6795dfb8-v7hh8(SpringBasedClientCacheApplication:1:loner):44012:ec3c8d2a:SpringBasedClientCacheApplication,connection=1 due to: The connection has been reset while reading the header
      
      [info 2021/08/09 11:02:19.867 GMT system-test-gemfire-server-2 <unicast receiver,system-test-gemfire-server-2-56622> tid=0x1e] saving cache server configuration for use with the cluster-configuration service on reconnect
      
      [info 2021/08/09 11:02:19.867 GMT system-test-gemfire-server-2 <unicast receiver,system-test-gemfire-server-2-56622> tid=0x1e] cache server configuration saved
      
      [fatal 2021/08/09 11:02:19.876 GMT system-test-gemfire-server-2 <ServerConnection on port 40404 Thread 14> tid=0xa9] Uncaught exception in thread Thread[ServerConnection on port 40404 Thread 14,5,main]
      java.lang.NullPointerException
      	at org.apache.geode.internal.cache.tier.sockets.ServerConnection.doNormalMessage(ServerConnection.java:865)
      	at org.apache.geode.internal.cache.tier.sockets.ServerConnection.doOneMessage(ServerConnection.java:1022)
      	at org.apache.geode.internal.cache.tier.sockets.ServerConnection.run(ServerConnection.java:1275)
      	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      	at org.apache.geode.internal.cache.tier.sockets.AcceptorImpl.lambda$initializeServerConnectionThreadPool$3(AcceptorImpl.java:690)
      	at org.apache.geode.logging.internal.executors.LoggingThreadFactory.lambda$newThread$0(LoggingThreadFactory.java:120)
      	at java.base/java.lang.Thread.run(Thread.java:829)
      

      The problem itself is really hard to reproduce, we only hit it twice in around 200 runs. Logs and statistics can be found here
      Even though the logs don't show much information, only what I've included above, I believe there's a race condition between the handleTermination() and doNormalMessage() methods within the ServerConnection class. One of the latest tasks within handleTermination is to set the clientUserAuths attribute as null, and I haven't found any synchronization around this.

      void handleTermination(boolean timedOut) {
      ...
          if (unregisterClient) {
            // last serverconnection call all close on auth objects
            cleanClientAuths();
          }
          clientUserAuths = null;
          if (needsUnregister) {
            acceptor.getClientHealthMonitor().removeConnection(proxyId, this);
            if (unregisterClient) {
              acceptor.getClientHealthMonitor().unregisterClient(proxyId, getAcceptor(),
                  clientDisconnectedCleanly, clientDisconnectedException);
            }
          }
      ...
      }
      

      I might be wrong here, but I think that if handleTermination is invoked interleaved with doNormalMessage, specifically after we check for the member shutting down but before we get the actual subject, the clientUserAuths attribute might have been set as null already by handleTermination and, as such, the exception is thrown.

      Below is small test that can be used to easily reproduce the issue:

      package org.apache.geode.internal.cache.tier.sockets;
      
      import static org.assertj.core.api.Assertions.assertThatCode;
      import static org.mockito.ArgumentMatchers.any;
      import static org.mockito.Mockito.mock;
      import static org.mockito.Mockito.spy;
      import static org.mockito.Mockito.when;
      import static org.mockito.quality.Strictness.LENIENT;
      
      import java.net.InetAddress;
      import java.net.Socket;
      import java.util.Random;
      
      import org.junit.Rule;
      import org.junit.Test;
      import org.junit.experimental.categories.Category;
      import org.mockito.junit.MockitoJUnit;
      import org.mockito.junit.MockitoRule;
      
      import org.apache.geode.distributed.internal.DistributionManager;
      import org.apache.geode.distributed.internal.InternalDistributedSystem;
      import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
      import org.apache.geode.internal.cache.InternalCache;
      import org.apache.geode.internal.cache.TXManagerImpl;
      import org.apache.geode.internal.cache.tier.Acceptor;
      import org.apache.geode.internal.cache.tier.CachedRegionHelper;
      import org.apache.geode.internal.cache.tier.CommunicationMode;
      import org.apache.geode.internal.cache.tier.MessageType;
      import org.apache.geode.internal.cache.tier.ServerSideHandshake;
      import org.apache.geode.internal.monitoring.ThreadsMonitoring;
      import org.apache.geode.internal.security.SecurityService;
      import org.apache.geode.internal.serialization.KnownVersion;
      import org.apache.geode.test.junit.categories.ClientServerTest;
      
      @Category(ClientServerTest.class)
      public class ServerConnectionNullPointerExceptionIntegrationTest {
        @Rule
        public MockitoRule mockitoRule = MockitoJUnit.rule().strictness(LENIENT);
      
        @Test
        public void nullPointerExceptionNotThrownWhenHandleTerminationIsInvokedInterleavedWithDoOneMessage() {
          InetAddress inetAddress = mock(InetAddress.class);
          AcceptorImpl acceptor = mock(AcceptorImpl.class);
          Socket socket = mock(Socket.class);
          InternalCache cache = mock(InternalCache.class);
          CachedRegionHelper cachedRegionHelper = mock(CachedRegionHelper.class);
          SecurityService securityService = mock(SecurityService.class);
          when(securityService.isIntegratedSecurity()).thenReturn(true);
          CacheServerStats stats = mock(CacheServerStats.class);
      
          when(inetAddress.getHostAddress()).thenReturn("localhost");
          when(socket.getInetAddress()).thenReturn(inetAddress);
      
          InternalDistributedSystem internalDistributedSystem = mock(InternalDistributedSystem.class);
          DistributionManager distributionManager = mock(DistributionManager.class);
          ThreadsMonitoring threadsMonitoring = mock(ThreadsMonitoring.class);
      
          when(cachedRegionHelper.getCache()).thenReturn(cache);
          when(cache.getInternalDistributedSystem()).thenReturn(internalDistributedSystem);
          when(internalDistributedSystem.getDM()).thenReturn(distributionManager);
          when(distributionManager.getThreadMonitoring()).thenReturn(threadsMonitoring);
      
          when(cache.getCacheTransactionManager()).thenReturn(mock(TXManagerImpl.class));
      
          ServerConnectionCollection mockCollection = spy(ServerConnectionCollection.class);
          when(mockCollection.incrementConnectionsProcessing()).thenReturn(true);
      
          ClientHealthMonitor mockClientHealthMonitor = mock(ClientHealthMonitor.class);
          when(mockClientHealthMonitor.addConnection(any(), any())).thenReturn(mockCollection);
          when(acceptor.getClientHealthMonitor()).thenReturn(mockClientHealthMonitor);
          when(acceptor.getConnectionListener()).thenReturn(mock(ConnectionListener.class));
      
          TestServerConnection testServerConnection = new TestServerConnection(socket, cache, cachedRegionHelper, stats, 0, 0, null, CommunicationMode.PrimaryServerToClient.getModeNumber(), acceptor, securityService);
      
          assertThatCode(testServerConnection::run).doesNotThrowAnyException();
        }
      
        private static class TestMessage extends Message {
          TestMessage() {
            super(3, KnownVersion.CURRENT);
            messageType = MessageType.REQUEST;
            securePart = new Part();
          }
      
          @Override
          public void receive() {
          }
        }
      
        private static class TestServerConnection extends ServerConnection {
          TestServerConnection(Socket socket, InternalCache internalCache,
              CachedRegionHelper cachedRegionHelper, CacheServerStats stats, int hsTimeout,
              int socketBufferSize, String communicationModeStr, byte communicationMode,
              Acceptor acceptor, SecurityService securityService) {
            super(socket, internalCache, cachedRegionHelper, stats, hsTimeout, socketBufferSize,
                communicationModeStr, communicationMode, acceptor, securityService);
            setClientDisconnectCleanly();
          }
      
          @Override
          protected void doHandshake() {
            ClientProxyMembershipID proxyID = mock(ClientProxyMembershipID.class);
            ServerSideHandshake handshake = mock(ServerSideHandshake.class);
            MessageIdExtractor extractor = mock(MessageIdExtractor.class);
            when(handshake.getVersion()).thenReturn(KnownVersion.CURRENT);
            when(proxyID.getDistributedMember()).thenReturn(mock(InternalDistributedMember.class));
            setHandshake(handshake);
            setProxyId(proxyID);
            processHandShake();
            setFakeRequest();
      
            super.doHandshake();
          }
      
          @Override
          // Naive approach to simulate that handleTermination was invoked in between
          public long getUniqueId() {
            handleTermination();
            return new Random().nextLong();
          }
      
          private void setFakeRequest() {
            TestMessage testMessage = new TestMessage();
            setRequestMessage(testMessage);
          }
        }
      }
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            jjramos Juan Ramos
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: