Index: shims/src/test/org/apache/hadoop/hive/thrift/TestZooKeeperTokenStore.java =================================================================== --- shims/src/test/org/apache/hadoop/hive/thrift/TestZooKeeperTokenStore.java (revision 1309631) +++ shims/src/test/org/apache/hadoop/hive/thrift/TestZooKeeperTokenStore.java (working copy) @@ -41,7 +41,9 @@ private ZooKeeper zkClient = null; private int zkPort = -1; private ZooKeeperTokenStore ts; - + // connect timeout large enough for slower test environments + private final int connectTimeoutMillis = 30000; + @Override protected void setUp() throws Exception { File zkDataDir = new File(System.getProperty("java.io.tmpdir")); @@ -50,8 +52,9 @@ } this.zkCluster = new MiniZooKeeperCluster(); this.zkPort = this.zkCluster.startup(zkDataDir); - this.zkClient = new ZooKeeper("localhost:" - + zkPort, 300, null); + + this.zkClient = ZooKeeperTokenStore.createConnectedClient("localhost:" + zkPort, 3000, + connectTimeoutMillis); } @Override @@ -72,6 +75,9 @@ conf.set( HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE, zkPath); + conf.setLong( + HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS, + connectTimeoutMillis); return conf; } Index: shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java =================================================================== --- shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java (revision 1309631) +++ shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java (working copy) @@ -213,6 +213,8 @@ "hive.cluster.delegation.token.store.class"; public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR = "hive.cluster.delegation.token.store.zookeeper.connectString"; + public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS = + "hive.cluster.delegation.token.store.zookeeper.connectTimeoutMillis"; public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE = "hive.cluster.delegation.token.store.zookeeper.znode"; public static final String DELEGATION_TOKEN_STORE_ZK_ACL = Index: shims/src/common-secure/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java =================================================================== --- shims/src/common-secure/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java (revision 1309631) +++ shims/src/common-secure/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java (working copy) @@ -23,6 +23,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -30,6 +32,7 @@ import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooDefs.Ids; @@ -56,6 +59,7 @@ private volatile ZooKeeper zkSession; private String zkConnectString; private final int zkSessionTimeout = 3000; + private long connectTimeoutMillis = -1; private List newNodeAcl = Ids.OPEN_ACL_UNSAFE; private class ZooKeeperWatcher implements Watcher { @@ -70,6 +74,7 @@ } } } + } /** @@ -89,8 +94,8 @@ synchronized (this) { if (zkSession == null || zkSession.getState() == States.CLOSED) { try { - zkSession = new ZooKeeper(this.zkConnectString, this.zkSessionTimeout, - new ZooKeeperWatcher()); + zkSession = createConnectedClient(this.zkConnectString, this.zkSessionTimeout, + this.connectTimeoutMillis, new ZooKeeperWatcher()); } catch (IOException ex) { throw new TokenStoreException("Token store error.", ex); } @@ -101,6 +106,49 @@ } /** + * Create a ZooKeeper session that is in connected state. + * + * @param connectString ZooKeeper connect String + * @param sessionTimeout ZooKeeper session timeout + * @param connectTimeout milliseconds to wait for connection, 0 or negative value means no wait + * @param watchers + * @return + * @throws InterruptedException + * @throws IOException + */ + public static ZooKeeper createConnectedClient(String connectString, + int sessionTimeout, long connectTimeout, final Watcher... watchers) + throws IOException { + final CountDownLatch connected = new CountDownLatch(1); + Watcher connectWatcher = new Watcher() { + @Override + public void process(WatchedEvent event) { + switch (event.getState()) { + case SyncConnected: + connected.countDown(); + break; + } + for (Watcher w : watchers) { + w.process(event); + } + } + }; + ZooKeeper zk = new ZooKeeper(connectString, sessionTimeout, connectWatcher); + if (connectTimeout > 0) { + try { + if (!connected.await(connectTimeout, TimeUnit.MILLISECONDS)) { + zk.close(); + throw new IOException("Timeout waiting for connection after " + + connectTimeout + "ms"); + } + } catch (InterruptedException e) { + throw new IOException("Error waiting for connection.", e); + } + } + return zk; + } + + /** * Create a path if it does not already exist ("mkdir -p") * @param zk ZooKeeper session * @param path string with '/' separator @@ -215,6 +263,8 @@ } this.zkConnectString = conf.get( HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null); + this.connectTimeoutMillis = conf.getLong( + HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS, -1); this.rootNode = conf.get( HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE, HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT);