Uploaded image for project: 'Accumulo'
  1. Accumulo
  2. ACCUMULO-2027

ZooKeeperInstance.close() not freeing resources in multithreaded env

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Won't Fix
    • None
    • None
    • None
    • None

    Description

      While looking at the changes related to ZooKeeperInstance.close() in the 1.4.5-SNAPSHOT branch I noticed there were race conditions where resources were not properly released. One type of race condition is where a thread is between a closed check in ZooKeeperInstance and calling a ZooCache method when ZooKeeperInstance.close() is called. The following is an example situation

      1. Thread 1 uses ZooKeeperInstance1 to get a zoocache.
      2. Thread 2 calls close() on ZooKeeperInstnce1 which calls close() on zoocache
      3. Thread 1 uses the zoocache it has reference to, causing a new zookeeper connection to be created.

      Below is an example program that will trigger this behavior. For me this little example program reliably shows a connected zookeeper after all of the threads die. If I use 0 threads it will show a closed zookeeper connection at the end.

       static class WriteTask implements Runnable {
      
          private BatchWriter writer;
          private Random rand;
      
          WriteTask(Connector conn) throws TableNotFoundException {
            rand = new Random();
            writer = conn.createBatchWriter("foo5", 10000000, 30000, 1);
          }
      
          @Override
          public void run() {
            try {
              while (true) {
                Mutation m1 = new Mutation(String.format("%06d", rand.nextInt(1000000)));
                m1.put(String.format("%06d", rand.nextInt(100)), String.format("%06d", rand.nextInt(100)), String.format("%06d", rand.nextInt(1000000)));
                writer.addMutation(m1);
                writer.flush();
              }
            } catch (Exception e) {
              System.out.println(e.getMessage());
            }
      
          }
      
        }
      
        static class ReadTask implements Runnable {
      
          private Scanner scanner;
      
          ReadTask(Connector conn) throws TableNotFoundException {
            scanner = conn.createScanner("foo5", new Authorizations());
          }
      
          @Override
          public void run() {
            try {
              while (true) {
      
                for (Entry<Key,Value> entry : scanner) {
      
                }
              }
            } catch (Exception e) {
              System.out.println(e.getMessage());
            }
      
          }
      
        }
      
        @Test(timeout = 30000)
        public void test2() throws Exception {
          ZooKeeperInstance zki = new ZooKeeperInstance(accumulo.getInstanceName(), accumulo.getZooKeepers());
      
          Connector conn = zki.getConnector("root", "superSecret");
      
          conn.tableOperations().create("foo5");
      
          ArrayList<Thread> threads = new ArrayList<Thread>();
      
          int numThreads = 10;
      
          for (int i = 0; i < numThreads; i++) {
            Thread t = new Thread(new WriteTask(conn));
            t.start();
            threads.add(t);
          }
      
          for (int i = 0; i < numThreads; i++) {
            Thread t = new Thread(new ReadTask(conn));
            t.start();
            threads.add(t);
          }
      
          // let threads get spun up
          Thread.sleep(1000);
      
          ZooSession.printSessions();
      
          zki.close();
      
          // wait for the threads to die
          for (Thread thread : threads) {
            thread.join();
          }
      
          ZooSession.printSessions();
      
        }
      
      

      Below are some changes I made to ZooSession for debugging purposes.

      diff --git a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
      index b3db26f..475a21d 100644
      --- a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
      +++ b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
      @@ -20,6 +20,8 @@
       import java.net.UnknownHostException;
       import java.util.HashMap;
       import java.util.Map;
      +import java.util.Map.Entry;
      +import java.util.Set;
       
       import org.apache.accumulo.core.util.UtilWaitThread;
       import org.apache.log4j.Logger;
      @@ -29,7 +31,7 @@
       import org.apache.zookeeper.ZooKeeper;
       import org.apache.zookeeper.ZooKeeper.States;
       
      -class ZooSession {
      +public class ZooSession {
         
         private static final Logger log = Logger.getLogger(ZooSession.class);
         
      @@ -121,6 +123,8 @@
           
           ZooSessionInfo zsi = sessions.get(sessionKey);
           if (zsi != null && zsi.zooKeeper.getState() == States.CLOSED) {
      +      System.out.println("Removing closed session ");
      +      new Exception().printStackTrace();
             if (auth != null && sessions.get(readOnlySessionKey) == zsi)
               sessions.remove(readOnlySessionKey);
             zsi = null;
      @@ -137,4 +141,13 @@
           }
           return zsi.zooKeeper;
         }
      +
      +  public static synchronized void printSessions() {
      +    Set<Entry<String,ZooSessionInfo>> es = sessions.entrySet();
      +
      +    for (Entry<String,ZooSessionInfo> entry : es) {
      +      System.out.println(entry.getKey() + " " + entry.getValue().zooKeeper.getState());
      +    }
      +  }
      +
       }
      

      With the above changes I will see an exception like the following when one of the race conditions occurs.

      Removing closed session 
      java.lang.Exception
      	at org.apache.accumulo.core.zookeeper.ZooSession.getSession(ZooSession.java:127)
      	at org.apache.accumulo.core.zookeeper.ZooReader.getSession(ZooReader.java:37)
      	at org.apache.accumulo.core.zookeeper.ZooReader.getZooKeeper(ZooReader.java:41)
      	at org.apache.accumulo.core.zookeeper.ZooCache.getZooKeeper(ZooCache.java:56)
      	at org.apache.accumulo.core.zookeeper.ZooCache.retry(ZooCache.java:127)
      	at org.apache.accumulo.core.zookeeper.ZooCache.get(ZooCache.java:233)
      	at org.apache.accumulo.core.zookeeper.ZooCache.get(ZooCache.java:188)
      	at org.apache.accumulo.core.client.ZooKeeperInstance.getInstanceID(ZooKeeperInstance.java:156)
      	at org.apache.accumulo.core.client.impl.TabletLocator.getInstance(TabletLocator.java:96)
      	at org.apache.accumulo.core.client.impl.ThriftScanner.scan(ThriftScanner.java:245)
      	at org.apache.accumulo.core.client.impl.ScannerIterator$Reader.run(ScannerIterator.java:94)
      	at org.apache.accumulo.core.client.impl.ScannerIterator.hasNext(ScannerIterator.java:176)
      	at org.apache.accumulo.minicluster.MiniAccumuloClusterTest$ReadTask.run(MiniAccumuloClusterTest.java:109)
      	at java.lang.Thread.run(Thread.java:662)
      

      Attachments

        Issue Links

          Activity

            People

              bills William Slacum
              kturner Keith Turner
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: