commit 86b59ee6f58a02f4e33df7f617120acd96bab02c Author: Vihang Karajgaonkar Date: Mon Sep 18 22:46:09 2017 -0700 HIVE-17371 : Move tokenstores to metastore module diff --git a/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestHiveAuthFactory.java b/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestHiveAuthFactory.java index e3a51909df14d90a538426163f332a6378f35f40..85efeab427f775cade53c6545f2cc6daccd04189 100644 --- a/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestHiveAuthFactory.java +++ b/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestHiveAuthFactory.java @@ -79,7 +79,7 @@ public void testStartTokenManagerForDBTokenStore() throws Exception { Assert.assertNotNull(keyTabFile); hiveConf.setVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB, keyTabFile); - hiveConf.setVar(ConfVars.METASTORE_CLUSTER_DELEGATION_TOKEN_STORE_CLS, "org.apache.hadoop.hive.thrift.DBTokenStore"); + hiveConf.setVar(ConfVars.METASTORE_CLUSTER_DELEGATION_TOKEN_STORE_CLS, "org.apache.hadoop.hive.metastore.security.DBTokenStore"); HiveAuthFactory authFactory = new HiveAuthFactory(hiveConf); Assert.assertNotNull(authFactory); diff --git a/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestJdbcWithDBTokenStore.java b/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestJdbcWithDBTokenStore.java index d690aaa673a50785561750f4f461ec867b6f0abc..ad2c3d0d2d3b1335087952aca4af4e2022a75954 100644 --- a/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestJdbcWithDBTokenStore.java +++ b/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestJdbcWithDBTokenStore.java @@ -32,7 +32,7 @@ public static void beforeTest() throws Exception { SessionHookTest.class.getName()); HiveConf hiveConf = new HiveConf(); - hiveConf.setVar(ConfVars.METASTORE_CLUSTER_DELEGATION_TOKEN_STORE_CLS, "org.apache.hadoop.hive.thrift.DBTokenStore"); + hiveConf.setVar(ConfVars.METASTORE_CLUSTER_DELEGATION_TOKEN_STORE_CLS, "org.apache.hadoop.hive.metastore.security.DBTokenStore"); miniHiveKdc = MiniHiveKdc.getMiniHiveKdc(hiveConf); miniHS2 = MiniHiveKdc.getMiniHS2WithKerbWithRemoteHMS(miniHiveKdc, hiveConf); miniHS2.start(confOverlay); diff --git a/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/metastore/security/TestHadoopAuthBridge23.java b/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/metastore/security/TestHadoopAuthBridge23.java index ef360400a1873ab142cb9cbdce045cbed394af0e..453651c8e998f4f4377ef7106d149ec4ca080a32 100644 --- a/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/metastore/security/TestHadoopAuthBridge23.java +++ b/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/metastore/security/TestHadoopAuthBridge23.java @@ -70,7 +70,7 @@ public static class MyTokenStore extends MemoryTokenStore { static volatile DelegationTokenStore TOKEN_STORE = null; - public void init(Object hmsHandler, HadoopThriftAuthBridge.Server.ServerMode smode) throws TokenStoreException { + public void init(Object hmsHandler, DelegationTokenStore.TokenStoreServerMode smode) throws TokenStoreException { super.init(hmsHandler, smode); TOKEN_STORE = this; try { diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestDBTokenStore.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestDBTokenStore.java new file mode 100644 index 0000000000000000000000000000000000000000..2b148dd75e7a86995646672c92daf4484a397eb2 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestDBTokenStore.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.security; + +import java.io.IOException; +import java.util.List; + +import junit.framework.TestCase; + +import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.security.DBTokenStore; +import org.apache.hadoop.hive.metastore.security.DelegationTokenStore.TokenStoreException; +import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; +import org.apache.hive.service.auth.HiveTokenStoreDelegationTokenSecretManager; +import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport; +import org.junit.Assert; + +public class TestDBTokenStore extends TestCase{ + + public void testDBTokenStore() throws TokenStoreException, MetaException, IOException { + + DelegationTokenStore ts = new DBTokenStore(); + ts.init(new HMSHandler("Test handler"), DelegationTokenStore.TokenStoreServerMode.METASTORE); + assertEquals(0, ts.getMasterKeys().length); + assertEquals(false,ts.removeMasterKey(-1)); + try{ + ts.updateMasterKey(-1, "non-existent-key"); + fail("Updated non-existent key."); + } catch (TokenStoreException e) { + assertTrue(e.getCause() instanceof NoSuchObjectException); + } + int keySeq = ts.addMasterKey("key1Data"); + int keySeq2 = ts.addMasterKey("key2Data"); + int keySeq2same = ts.addMasterKey("key2Data"); + assertEquals("keys sequential", keySeq + 1, keySeq2); + assertEquals("keys sequential", keySeq + 2, keySeq2same); + assertEquals("expected number of keys", 3, ts.getMasterKeys().length); + assertTrue(ts.removeMasterKey(keySeq)); + assertTrue(ts.removeMasterKey(keySeq2same)); + assertEquals("expected number of keys", 1, ts.getMasterKeys().length); + assertEquals("key2Data",ts.getMasterKeys()[0]); + ts.updateMasterKey(keySeq2, "updatedData"); + assertEquals("updatedData",ts.getMasterKeys()[0]); + assertTrue(ts.removeMasterKey(keySeq2)); + + // tokens + assertEquals(0, ts.getAllDelegationTokenIdentifiers().size()); + DelegationTokenIdentifier tokenId = new DelegationTokenIdentifier( + new Text("owner"), new Text("renewer"), new Text("realUser")); + assertNull(ts.getToken(tokenId)); + assertFalse(ts.removeToken(tokenId)); + DelegationTokenInformation tokenInfo = new DelegationTokenInformation( + 99, "password".getBytes()); + assertTrue(ts.addToken(tokenId, tokenInfo)); + assertFalse(ts.addToken(tokenId, tokenInfo)); + DelegationTokenInformation tokenInfoRead = ts.getToken(tokenId); + assertEquals(tokenInfo.getRenewDate(), tokenInfoRead.getRenewDate()); + assertNotSame(tokenInfo, tokenInfoRead); + Assert.assertArrayEquals(HiveDelegationTokenSupport + .encodeDelegationTokenInformation(tokenInfo), + HiveDelegationTokenSupport + .encodeDelegationTokenInformation(tokenInfoRead)); + + List allIds = ts + .getAllDelegationTokenIdentifiers(); + assertEquals(1, allIds.size()); + Assert.assertEquals(HiveTokenStoreDelegationTokenSecretManager + .encodeWritable(tokenId), + HiveTokenStoreDelegationTokenSecretManager.encodeWritable(allIds + .get(0))); + + assertTrue(ts.removeToken(tokenId)); + assertEquals(0, ts.getAllDelegationTokenIdentifiers().size()); + assertNull(ts.getToken(tokenId)); + ts.close(); + } +} diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZooKeeperTokenStore.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZooKeeperTokenStore.java new file mode 100644 index 0000000000000000000000000000000000000000..8648f05878ad771efafdb67dd8393f0488982b9e --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZooKeeperTokenStore.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.security; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +import junit.framework.TestCase; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.hive.metastore.security.ZooKeeperTokenStore; +import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; +import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; +import org.apache.hive.service.auth.HiveDelegationTokenManager; +import org.apache.hive.service.auth.HiveTokenStoreDelegationTokenSecretManager; +import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.ACL; +import org.junit.Assert; + +public class TestZooKeeperTokenStore extends TestCase { + + private MiniZooKeeperCluster zkCluster = null; + private CuratorFramework zkClient = null; + private int zkPort = -1; + private ZooKeeperTokenStore ts; + + @Override + protected void setUp() throws Exception { + File zkDataDir = new File(System.getProperty("test.tmp.dir")); + if (this.zkCluster != null) { + throw new IOException("Cluster already running"); + } + this.zkCluster = new MiniZooKeeperCluster(); + this.zkPort = this.zkCluster.startup(zkDataDir); + this.zkClient = + CuratorFrameworkFactory.builder().connectString("localhost:" + zkPort) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); + this.zkClient.start(); + } + + @Override + protected void tearDown() throws Exception { + this.zkClient.close(); + if (ts != null) { + ts.close(); + } + this.zkCluster.shutdown(); + this.zkCluster = null; + } + + private Configuration createConf(String zkPath) { + Configuration conf = new Configuration(); + conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, "localhost:" + + this.zkPort); + conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ZNODE, zkPath); + return conf; + } + + public void testTokenStorage() throws Exception { + String ZK_PATH = "/zktokenstore-testTokenStorage"; + ts = new ZooKeeperTokenStore(); + Configuration conf = createConf(ZK_PATH); + conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ACL, "world:anyone:cdrwa"); + ts.setConf(conf); + ts.init(null, DelegationTokenStore.TokenStoreServerMode.METASTORE); + + + String metastore_zk_path = ZK_PATH + ServerMode.METASTORE; + int keySeq = ts.addMasterKey("key1Data"); + byte[] keyBytes = zkClient.getData().forPath( + metastore_zk_path + "/keys/" + String.format(ZooKeeperTokenStore.ZK_SEQ_FORMAT, keySeq)); + assertNotNull(keyBytes); + assertEquals(new String(keyBytes), "key1Data"); + + int keySeq2 = ts.addMasterKey("key2Data"); + assertEquals("keys sequential", keySeq + 1, keySeq2); + assertEquals("expected number keys", 2, ts.getMasterKeys().length); + + ts.removeMasterKey(keySeq); + assertEquals("expected number keys", 1, ts.getMasterKeys().length); + + // tokens + DelegationTokenIdentifier tokenId = new DelegationTokenIdentifier( + new Text("owner"), new Text("renewer"), new Text("realUser")); + DelegationTokenInformation tokenInfo = new DelegationTokenInformation( + 99, "password".getBytes()); + ts.addToken(tokenId, tokenInfo); + DelegationTokenInformation tokenInfoRead = ts.getToken(tokenId); + assertEquals(tokenInfo.getRenewDate(), tokenInfoRead.getRenewDate()); + assertNotSame(tokenInfo, tokenInfoRead); + Assert.assertArrayEquals(HiveDelegationTokenSupport + .encodeDelegationTokenInformation(tokenInfo), + HiveDelegationTokenSupport + .encodeDelegationTokenInformation(tokenInfoRead)); + + List allIds = ts.getAllDelegationTokenIdentifiers(); + assertEquals(1, allIds.size()); + Assert.assertEquals(HiveTokenStoreDelegationTokenSecretManager + .encodeWritable(tokenId), + HiveTokenStoreDelegationTokenSecretManager.encodeWritable(allIds + .get(0))); + + assertTrue(ts.removeToken(tokenId)); + assertEquals(0, ts.getAllDelegationTokenIdentifiers().size()); + assertNull(ts.getToken(tokenId)); + } + + public void testAclNoAuth() throws Exception { + String ZK_PATH = "/zktokenstore-testAclNoAuth"; + Configuration conf = createConf(ZK_PATH); + conf.set( + MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ACL, + "ip:127.0.0.1:r"); + + ts = new ZooKeeperTokenStore(); + try { + ts.setConf(conf); + ts.init(null, DelegationTokenStore.TokenStoreServerMode.METASTORE); + fail("expected ACL exception"); + } catch (DelegationTokenStore.TokenStoreException e) { + assertEquals(KeeperException.NoAuthException.class, e.getCause().getClass()); + } + } + + public void testAclInvalid() throws Exception { + String ZK_PATH = "/zktokenstore-testAclInvalid"; + String aclString = "sasl:hive/host@TEST.DOMAIN:cdrwa, fail-parse-ignored"; + Configuration conf = createConf(ZK_PATH); + conf.set( + MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ACL, + aclString); + + List aclList = ZooKeeperTokenStore.parseACLs(aclString); + assertEquals(1, aclList.size()); + + ts = new ZooKeeperTokenStore(); + try { + ts.setConf(conf); + ts.init(null, DelegationTokenStore.TokenStoreServerMode.METASTORE); + fail("expected ACL exception"); + } catch (DelegationTokenStore.TokenStoreException e) { + assertEquals(KeeperException.InvalidACLException.class, e.getCause().getClass()); + } + } + + public void testAclPositive() throws Exception { + String ZK_PATH = "/zktokenstore-testAcl"; + Configuration conf = createConf(ZK_PATH); + conf.set( + MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ACL, + "ip:127.0.0.1:cdrwa,world:anyone:cdrwa"); + ts = new ZooKeeperTokenStore(); + ts.setConf(conf); + ts.init(null, DelegationTokenStore.TokenStoreServerMode.METASTORE); + List acl = zkClient.getACL().forPath(ZK_PATH + ServerMode.METASTORE); + assertEquals(2, acl.size()); + } + +} diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestDBTokenStore.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestDBTokenStore.java deleted file mode 100644 index 4bfa22419cb1ea5abe56773c1c76186b01996e03..0000000000000000000000000000000000000000 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestDBTokenStore.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.thrift; - -import java.io.IOException; -import java.util.List; - -import junit.framework.TestCase; - -import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.hadoop.hive.thrift.DelegationTokenStore.TokenStoreException; -import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; -import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport; -import org.junit.Assert; - -public class TestDBTokenStore extends TestCase{ - - public void testDBTokenStore() throws TokenStoreException, MetaException, IOException { - - DelegationTokenStore ts = new DBTokenStore(); - ts.init(new HMSHandler("Test handler"), ServerMode.METASTORE); - assertEquals(0, ts.getMasterKeys().length); - assertEquals(false,ts.removeMasterKey(-1)); - try{ - ts.updateMasterKey(-1, "non-existent-key"); - fail("Updated non-existent key."); - } catch (TokenStoreException e) { - assertTrue(e.getCause() instanceof NoSuchObjectException); - } - int keySeq = ts.addMasterKey("key1Data"); - int keySeq2 = ts.addMasterKey("key2Data"); - int keySeq2same = ts.addMasterKey("key2Data"); - assertEquals("keys sequential", keySeq + 1, keySeq2); - assertEquals("keys sequential", keySeq + 2, keySeq2same); - assertEquals("expected number of keys", 3, ts.getMasterKeys().length); - assertTrue(ts.removeMasterKey(keySeq)); - assertTrue(ts.removeMasterKey(keySeq2same)); - assertEquals("expected number of keys", 1, ts.getMasterKeys().length); - assertEquals("key2Data",ts.getMasterKeys()[0]); - ts.updateMasterKey(keySeq2, "updatedData"); - assertEquals("updatedData",ts.getMasterKeys()[0]); - assertTrue(ts.removeMasterKey(keySeq2)); - - // tokens - assertEquals(0, ts.getAllDelegationTokenIdentifiers().size()); - DelegationTokenIdentifier tokenId = new DelegationTokenIdentifier( - new Text("owner"), new Text("renewer"), new Text("realUser")); - assertNull(ts.getToken(tokenId)); - assertFalse(ts.removeToken(tokenId)); - DelegationTokenInformation tokenInfo = new DelegationTokenInformation( - 99, "password".getBytes()); - assertTrue(ts.addToken(tokenId, tokenInfo)); - assertFalse(ts.addToken(tokenId, tokenInfo)); - DelegationTokenInformation tokenInfoRead = ts.getToken(tokenId); - assertEquals(tokenInfo.getRenewDate(), tokenInfoRead.getRenewDate()); - assertNotSame(tokenInfo, tokenInfoRead); - Assert.assertArrayEquals(HiveDelegationTokenSupport - .encodeDelegationTokenInformation(tokenInfo), - HiveDelegationTokenSupport - .encodeDelegationTokenInformation(tokenInfoRead)); - - List allIds = ts - .getAllDelegationTokenIdentifiers(); - assertEquals(1, allIds.size()); - Assert.assertEquals(TokenStoreDelegationTokenSecretManager - .encodeWritable(tokenId), - TokenStoreDelegationTokenSecretManager.encodeWritable(allIds - .get(0))); - - assertTrue(ts.removeToken(tokenId)); - assertEquals(0, ts.getAllDelegationTokenIdentifiers().size()); - assertNull(ts.getToken(tokenId)); - ts.close(); - } -} diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestZooKeeperTokenStore.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestZooKeeperTokenStore.java deleted file mode 100644 index 7800416a60239246e96497229d592736f2a9d79d..0000000000000000000000000000000000000000 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestZooKeeperTokenStore.java +++ /dev/null @@ -1,179 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.thrift; - -import java.io.File; -import java.io.IOException; -import java.util.List; - -import junit.framework.TestCase; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.retry.ExponentialBackoffRetry; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; -import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; -import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.data.ACL; -import org.junit.Assert; - -public class TestZooKeeperTokenStore extends TestCase { - - private MiniZooKeeperCluster zkCluster = null; - private CuratorFramework zkClient = null; - private int zkPort = -1; - private ZooKeeperTokenStore ts; - - @Override - protected void setUp() throws Exception { - File zkDataDir = new File(System.getProperty("test.tmp.dir")); - if (this.zkCluster != null) { - throw new IOException("Cluster already running"); - } - this.zkCluster = new MiniZooKeeperCluster(); - this.zkPort = this.zkCluster.startup(zkDataDir); - this.zkClient = - CuratorFrameworkFactory.builder().connectString("localhost:" + zkPort) - .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); - this.zkClient.start(); - } - - @Override - protected void tearDown() throws Exception { - this.zkClient.close(); - if (ts != null) { - ts.close(); - } - this.zkCluster.shutdown(); - this.zkCluster = null; - } - - private Configuration createConf(String zkPath) { - Configuration conf = new Configuration(); - conf.set(HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, "localhost:" - + this.zkPort); - conf.set(HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ZNODE, zkPath); - return conf; - } - - public void testTokenStorage() throws Exception { - String ZK_PATH = "/zktokenstore-testTokenStorage"; - ts = new ZooKeeperTokenStore(); - Configuration conf = createConf(ZK_PATH); - conf.set(HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ACL, "world:anyone:cdrwa"); - ts.setConf(conf); - ts.init(null, ServerMode.METASTORE); - - - String metastore_zk_path = ZK_PATH + ServerMode.METASTORE; - int keySeq = ts.addMasterKey("key1Data"); - byte[] keyBytes = zkClient.getData().forPath( - metastore_zk_path + "/keys/" + String.format(ZooKeeperTokenStore.ZK_SEQ_FORMAT, keySeq)); - assertNotNull(keyBytes); - assertEquals(new String(keyBytes), "key1Data"); - - int keySeq2 = ts.addMasterKey("key2Data"); - assertEquals("keys sequential", keySeq + 1, keySeq2); - assertEquals("expected number keys", 2, ts.getMasterKeys().length); - - ts.removeMasterKey(keySeq); - assertEquals("expected number keys", 1, ts.getMasterKeys().length); - - // tokens - DelegationTokenIdentifier tokenId = new DelegationTokenIdentifier( - new Text("owner"), new Text("renewer"), new Text("realUser")); - DelegationTokenInformation tokenInfo = new DelegationTokenInformation( - 99, "password".getBytes()); - ts.addToken(tokenId, tokenInfo); - DelegationTokenInformation tokenInfoRead = ts.getToken(tokenId); - assertEquals(tokenInfo.getRenewDate(), tokenInfoRead.getRenewDate()); - assertNotSame(tokenInfo, tokenInfoRead); - Assert.assertArrayEquals(HiveDelegationTokenSupport - .encodeDelegationTokenInformation(tokenInfo), - HiveDelegationTokenSupport - .encodeDelegationTokenInformation(tokenInfoRead)); - - List allIds = ts.getAllDelegationTokenIdentifiers(); - assertEquals(1, allIds.size()); - Assert.assertEquals(TokenStoreDelegationTokenSecretManager - .encodeWritable(tokenId), - TokenStoreDelegationTokenSecretManager.encodeWritable(allIds - .get(0))); - - assertTrue(ts.removeToken(tokenId)); - assertEquals(0, ts.getAllDelegationTokenIdentifiers().size()); - assertNull(ts.getToken(tokenId)); - } - - public void testAclNoAuth() throws Exception { - String ZK_PATH = "/zktokenstore-testAclNoAuth"; - Configuration conf = createConf(ZK_PATH); - conf.set( - HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ACL, - "ip:127.0.0.1:r"); - - ts = new ZooKeeperTokenStore(); - try { - ts.setConf(conf); - ts.init(null, ServerMode.METASTORE); - fail("expected ACL exception"); - } catch (DelegationTokenStore.TokenStoreException e) { - assertEquals(KeeperException.NoAuthException.class, e.getCause().getClass()); - } - } - - public void testAclInvalid() throws Exception { - String ZK_PATH = "/zktokenstore-testAclInvalid"; - String aclString = "sasl:hive/host@TEST.DOMAIN:cdrwa, fail-parse-ignored"; - Configuration conf = createConf(ZK_PATH); - conf.set( - HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ACL, - aclString); - - List aclList = ZooKeeperTokenStore.parseACLs(aclString); - assertEquals(1, aclList.size()); - - ts = new ZooKeeperTokenStore(); - try { - ts.setConf(conf); - ts.init(null, ServerMode.METASTORE); - fail("expected ACL exception"); - } catch (DelegationTokenStore.TokenStoreException e) { - assertEquals(KeeperException.InvalidACLException.class, e.getCause().getClass()); - } - } - - public void testAclPositive() throws Exception { - String ZK_PATH = "/zktokenstore-testAcl"; - Configuration conf = createConf(ZK_PATH); - conf.set( - HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ACL, - "ip:127.0.0.1:cdrwa,world:anyone:cdrwa"); - ts = new ZooKeeperTokenStore(); - ts.setConf(conf); - ts.init(null, ServerMode.METASTORE); - List acl = zkClient.getACL().forPath(ZK_PATH + ServerMode.METASTORE); - assertEquals(2, acl.size()); - } - -} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index cf33cca24ffc7583f2be62bb79815b5e43b2ff42..48f20ad0d7f387e86543c3e19cf1ce271ddd8a87 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -7656,8 +7656,7 @@ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge, conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL)); // Start delegation token manager delegationTokenManager = new MetastoreDelegationTokenManager(); - delegationTokenManager.startDelegationTokenSecretManager(conf, baseHandler, - HadoopThriftAuthBridge.Server.ServerMode.METASTORE); + delegationTokenManager.startDelegationTokenSecretManager(conf, baseHandler); saslServer.setSecretManager(delegationTokenManager.getSecretManager()); transFactory = saslServer.createTransportFactory( MetaStoreUtils.getMetaStoreSaslProperties(conf, useSSL)); diff --git a/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java b/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java index 541fe5ec8da9ba66ac44c65e6fe0dcd3c601943f..36f6e8aae245e4a819ec177ae6eccae6700fd823 100644 --- a/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java +++ b/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java @@ -27,13 +27,18 @@ import javax.security.sasl.AuthenticationException; import javax.security.sasl.Sasl; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.conf.HiveConfUtil; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.shims.HadoopShims.KerberosNameShim; import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.hive.thrift.DBTokenStore; -import org.apache.hadoop.hive.thrift.HiveDelegationTokenManager; +import org.apache.hadoop.hive.metastore.security.DBTokenStore; +import org.apache.hadoop.hive.metastore.security.DelegationTokenStore; +import org.apache.hadoop.hive.metastore.security.MemoryTokenStore; +import org.apache.hadoop.hive.metastore.security.ZooKeeperTokenStore; import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge; import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; import org.apache.hadoop.security.SaslRpcServer.AuthMethod; @@ -90,7 +95,7 @@ public HiveAuthFactory(HiveConf conf) throws TTransportException { delegationTokenManager = new HiveDelegationTokenManager(); try { Object baseHandler = null; - String tokenStoreClass = conf.getVar(HiveConf.ConfVars.METASTORE_CLUSTER_DELEGATION_TOKEN_STORE_CLS); + String tokenStoreClass = HiveAuthFactory.getTokenStoreClassName(conf); if (tokenStoreClass.equals(DBTokenStore.class.getName())) { // IMetaStoreClient is needed to access token store if DBTokenStore is to be used. It @@ -104,7 +109,8 @@ public HiveAuthFactory(HiveConf conf) throws TTransportException { baseHandler = Hive.class; } - delegationTokenManager.startDelegationTokenSecretManager(conf, baseHandler, ServerMode.HIVESERVER2); + delegationTokenManager.startDelegationTokenSecretManager(conf, baseHandler, + DelegationTokenStore.TokenStoreServerMode.HIVESERVER2); saslServer.setSecretManager(delegationTokenManager.getSecretManager()); } catch (IOException e) { @@ -325,4 +331,38 @@ public static void verifyProxyAccess(String realUser, String proxyUser, String i } } + /** + * This method should be used to return the metastore specific tokenstore class name to main + * backwards compatibility + * + * @param conf - HiveConf object + * @return the tokenStoreClass name from the HiveConf. It maps the hive specific tokenstoreclass + * name to metastore module specific class name. For eg: + * hive.cluster.delegation.token.store.class is set to + * org.apache.hadoop.hive.thrift.MemoryTokenStore it returns the equivalent tokenstore + * class defined in the metastore module which is + * org.apache.hadoop.hive.metastore.security.MemoryTokenStore Similarly, + * org.apache.hadoop.hive.thrift.DBTokenStore maps to + * org.apache.hadoop.hive.metastore.security.DBTokenStore and + * org.apache.hadoop.hive.thrift.ZooKeeperTokenStore maps to + * org.apache.hadoop.hive.metastore.security.ZooKeeperTokenStore + */ + public static String getTokenStoreClassName(Configuration conf) { + String tokenStoreClass = + conf.get(HiveConf.ConfVars.METASTORE_CLUSTER_DELEGATION_TOKEN_STORE_CLS.varname, ""); + if (StringUtils.isBlank(tokenStoreClass)) { + // default tokenstore is MemoryTokenStore + return MemoryTokenStore.class.getName(); + } + switch (tokenStoreClass) { + case "org.apache.hadoop.hive.thrift.DBTokenStore": + return DBTokenStore.class.getName(); + case "org.apache.hadoop.hive.thrift.MemoryTokenStore": + return MemoryTokenStore.class.getName(); + case "org.apache.hadoop.hive.thrift.ZooKeeperTokenStore": + return ZooKeeperTokenStore.class.getName(); + default: + return tokenStoreClass; + } + } } diff --git a/service/src/java/org/apache/hive/service/auth/HiveDelegationTokenManager.java b/service/src/java/org/apache/hive/service/auth/HiveDelegationTokenManager.java new file mode 100644 index 0000000000000000000000000000000000000000..7e1a5e97ca2263561185746d644080d36e54e9ae --- /dev/null +++ b/service/src/java/org/apache/hive/service/auth/HiveDelegationTokenManager.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hive.service.auth; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConfUtil; +import org.apache.hadoop.hive.metastore.security.DelegationTokenStore; +import org.apache.hadoop.hive.shims.Utils; +import org.apache.hadoop.hive.thrift.DelegationTokenSecretManager; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authorize.ProxyUsers; +import org.apache.hadoop.util.ReflectionUtils; + +public class HiveDelegationTokenManager { + + public static final String DELEGATION_TOKEN_GC_INTERVAL = + "hive.cluster.delegation.token.gc-interval"; + private final static long DELEGATION_TOKEN_GC_INTERVAL_DEFAULT = 3600000; // 1 hour + // Delegation token related keys + public static final String DELEGATION_KEY_UPDATE_INTERVAL_KEY = + "hive.cluster.delegation.key.update-interval"; + public static final long DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT = + 24*60*60*1000; // 1 day + public static final String DELEGATION_TOKEN_RENEW_INTERVAL_KEY = + "hive.cluster.delegation.token.renew-interval"; + public static final long DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT = + 24*60*60*1000; // 1 day + public static final String DELEGATION_TOKEN_MAX_LIFETIME_KEY = + "hive.cluster.delegation.token.max-lifetime"; + public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT = + 7*24*60*60*1000; // 7 days + + + protected DelegationTokenSecretManager secretManager; + + public HiveDelegationTokenManager() { + } + + public DelegationTokenSecretManager getSecretManager() { + return secretManager; + } + + public void startDelegationTokenSecretManager(Configuration conf, Object hms, DelegationTokenStore.TokenStoreServerMode serverMode) + throws IOException { + long secretKeyInterval = + conf.getLong(DELEGATION_KEY_UPDATE_INTERVAL_KEY, DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT); + long tokenMaxLifetime = + conf.getLong(DELEGATION_TOKEN_MAX_LIFETIME_KEY, DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT); + long tokenRenewInterval = + conf.getLong(DELEGATION_TOKEN_RENEW_INTERVAL_KEY, DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT); + long tokenGcInterval = + conf.getLong(DELEGATION_TOKEN_GC_INTERVAL, DELEGATION_TOKEN_GC_INTERVAL_DEFAULT); + + DelegationTokenStore dts = getTokenStore(conf); + dts.setConf(conf); + dts.init(hms, serverMode); + secretManager = + new HiveTokenStoreDelegationTokenSecretManager(secretKeyInterval, tokenMaxLifetime, + tokenRenewInterval, tokenGcInterval, dts); + secretManager.startThreads(); + } + + public String getDelegationToken(final String owner, final String renewer, String remoteAddr) + throws IOException, + InterruptedException { + /** + * If the user asking the token is same as the 'owner' then don't do + * any proxy authorization checks. For cases like oozie, where it gets + * a delegation token for another user, we need to make sure oozie is + * authorized to get a delegation token. + */ + // Do all checks on short names + UserGroupInformation currUser = UserGroupInformation.getCurrentUser(); + UserGroupInformation ownerUgi = UserGroupInformation.createRemoteUser(owner); + if (!ownerUgi.getShortUserName().equals(currUser.getShortUserName())) { + // in the case of proxy users, the getCurrentUser will return the + // real user (for e.g. oozie) due to the doAs that happened just before the + // server started executing the method getDelegationToken in the MetaStore + ownerUgi = UserGroupInformation.createProxyUser(owner, UserGroupInformation.getCurrentUser()); + ProxyUsers.authorize(ownerUgi, remoteAddr, null); + } + return ownerUgi.doAs(new PrivilegedExceptionAction() { + + @Override + public String run() throws IOException { + return secretManager.getDelegationToken(renewer); + } + }); + } + + public String getDelegationTokenWithService(String owner, String renewer, String service, String remoteAddr) + throws IOException, InterruptedException { + String token = getDelegationToken(owner, renewer, remoteAddr); + return Utils.addServiceToToken(token, service); + } + + public long renewDelegationToken(String tokenStrForm) + throws IOException { + return secretManager.renewDelegationToken(tokenStrForm); + } + + public String getUserFromToken(String tokenStr) throws IOException { + return secretManager.getUserFromToken(tokenStr); + } + + public void cancelDelegationToken(String tokenStrForm) throws IOException { + secretManager.cancelDelegationToken(tokenStrForm); + } + + /** + * Verify token string + * @param tokenStrForm + * @return user name + * @throws IOException + */ + public String verifyDelegationToken(String tokenStrForm) throws IOException { + return secretManager.verifyDelegationToken(tokenStrForm); + } + + private DelegationTokenStore getTokenStore(Configuration conf) throws IOException { + String tokenStoreClassName = HiveAuthFactory.getTokenStoreClassName(conf); + try { + Class storeClass = + Class.forName(tokenStoreClassName).asSubclass(DelegationTokenStore.class); + return ReflectionUtils.newInstance(storeClass, conf); + } catch (ClassNotFoundException e) { + throw new IOException("Error initializing delegation token store: " + tokenStoreClassName, e); + } + } +} diff --git a/service/src/java/org/apache/hive/service/auth/HiveTokenStoreDelegationTokenSecretManager.java b/service/src/java/org/apache/hive/service/auth/HiveTokenStoreDelegationTokenSecretManager.java new file mode 100644 index 0000000000000000000000000000000000000000..7a5c565afe2de71e79617a04c53bbd36602acae7 --- /dev/null +++ b/service/src/java/org/apache/hive/service/auth/HiveTokenStoreDelegationTokenSecretManager.java @@ -0,0 +1,338 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.auth; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.hive.metastore.security.DelegationTokenStore; +import org.apache.hadoop.hive.thrift.DelegationTokenSecretManager; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; +import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Extension of {@link DelegationTokenSecretManager} to support alternative to default in-memory + * token management for fail-over and clustering through plug-able token store (ZooKeeper etc.). + * Delegation tokens will be retrieved from the store on-demand and (unlike base class behavior) not + * cached in memory. This avoids complexities related to token expiration. The security token is + * needed only at the time the transport is opened (as opposed to per interface operation). The + * assumption therefore is low cost of interprocess token retrieval (for random read efficient store + * such as ZooKeeper) compared to overhead of synchronizing per-process in-memory token caches. + * The wrapper incorporates the token store abstraction within the limitations of current + * Hive/Hadoop dependency (.20S) with minimum code duplication. + * Eventually this should be supported by Hadoop security directly. + */ +public class HiveTokenStoreDelegationTokenSecretManager extends DelegationTokenSecretManager { + + private static final Logger LOGGER = + LoggerFactory.getLogger(HiveTokenStoreDelegationTokenSecretManager.class.getName()); + + final private long keyUpdateInterval; + final private long tokenRemoverScanInterval; + private Thread tokenRemoverThread; + + final private DelegationTokenStore tokenStore; + + public HiveTokenStoreDelegationTokenSecretManager(long delegationKeyUpdateInterval, + long delegationTokenMaxLifetime, long delegationTokenRenewInterval, + long delegationTokenRemoverScanInterval, + DelegationTokenStore sharedStore) { + super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, delegationTokenRenewInterval, + delegationTokenRemoverScanInterval); + this.keyUpdateInterval = delegationKeyUpdateInterval; + this.tokenRemoverScanInterval = delegationTokenRemoverScanInterval; + + this.tokenStore = sharedStore; + } + + protected Map reloadKeys() { + // read keys from token store + String[] allKeys = tokenStore.getMasterKeys(); + Map keys + = new HashMap(allKeys.length); + for (String keyStr : allKeys) { + DelegationKey key = new DelegationKey(); + try { + decodeWritable(key, keyStr); + keys.put(key.getKeyId(), key); + } catch (IOException ex) { + LOGGER.error("Failed to load master key.", ex); + } + } + synchronized (this) { + super.allKeys.clear(); + super.allKeys.putAll(keys); + } + return keys; + } + + @Override + public byte[] retrievePassword(AbstractDelegationTokenIdentifier identifier) throws InvalidToken { + DelegationTokenInformation info = this.tokenStore.getToken(identifier); + if (info == null) { + throw new InvalidToken("token expired or does not exist: " + identifier); + } + // must reuse super as info.getPassword is not accessible + synchronized (this) { + try { + super.currentTokens.put(identifier, info); + return super.retrievePassword(identifier); + } finally { + super.currentTokens.remove(identifier); + } + } + } + + @Override + public AbstractDelegationTokenIdentifier cancelToken(Token token, + String canceller) throws IOException { + AbstractDelegationTokenIdentifier id = getTokenIdentifier(token); + LOGGER.info("Token cancelation requested for identifier: "+id); + this.tokenStore.removeToken(id); + return id; + } + + /** + * Create the password and add it to shared store. + */ + @Override + protected byte[] createPassword(AbstractDelegationTokenIdentifier id) { + byte[] password; + DelegationTokenInformation info; + synchronized (this) { + password = super.createPassword(id); + // add new token to shared store + // need to persist expiration along with password + info = super.currentTokens.remove(id); + if (info == null) { + throw new IllegalStateException("Failed to retrieve token after creation"); + } + } + this.tokenStore.addToken(id, info); + return password; + } + + @Override + public long renewToken(Token token, + String renewer) throws InvalidToken, IOException { + // since renewal is KERBEROS authenticated token may not be cached + final AbstractDelegationTokenIdentifier id = getTokenIdentifier(token); + DelegationTokenInformation tokenInfo = this.tokenStore.getToken(id); + if (tokenInfo == null) { + throw new InvalidToken("token does not exist: " + id); // no token found + } + // ensure associated master key is available + if (!super.allKeys.containsKey(id.getMasterKeyId())) { + LOGGER.info("Unknown master key (id={}), (re)loading keys from token store.", + id.getMasterKeyId()); + reloadKeys(); + } + // reuse super renewal logic + synchronized (this) { + super.currentTokens.put(id, tokenInfo); + try { + return super.renewToken(token, renewer); + } finally { + super.currentTokens.remove(id); + } + } + } + + public static String encodeWritable(Writable key) throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + key.write(dos); + dos.flush(); + return Base64.encodeBase64URLSafeString(bos.toByteArray()); + } + + public static void decodeWritable(Writable w, String idStr) throws IOException { + DataInputStream in = new DataInputStream(new ByteArrayInputStream(Base64.decodeBase64(idStr))); + w.readFields(in); + } + + /** + * Synchronize master key updates / sequence generation for multiple nodes. + * NOTE: {@link AbstractDelegationTokenSecretManager} keeps currentKey private, so we need + * to utilize this "hook" to manipulate the key through the object reference. + * This .20S workaround should cease to exist when Hadoop supports token store. + */ + @Override + protected void logUpdateMasterKey(DelegationKey key) throws IOException { + int keySeq = this.tokenStore.addMasterKey(encodeWritable(key)); + // update key with assigned identifier + DelegationKey keyWithSeq = new DelegationKey(keySeq, key.getExpiryDate(), key.getKey()); + String keyStr = encodeWritable(keyWithSeq); + this.tokenStore.updateMasterKey(keySeq, keyStr); + decodeWritable(key, keyStr); + LOGGER.info("New master key with key id={}", key.getKeyId()); + super.logUpdateMasterKey(key); + } + + @Override + public synchronized void startThreads() throws IOException { + try { + // updateCurrentKey needs to be called to initialize the master key + // (there should be a null check added in the future in rollMasterKey) + // updateCurrentKey(); + Method m = AbstractDelegationTokenSecretManager.class.getDeclaredMethod("updateCurrentKey"); + m.setAccessible(true); + m.invoke(this); + } catch (Exception e) { + throw new IOException("Failed to initialize master key", e); + } + running = true; + tokenRemoverThread = new Daemon(new ExpiredTokenRemover()); + tokenRemoverThread.start(); + } + + @Override + public synchronized void stopThreads() { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Stopping expired delegation token remover thread"); + } + running = false; + if (tokenRemoverThread != null) { + tokenRemoverThread.interrupt(); + } + } + + /** + * Remove expired tokens. Replaces logic in {@link AbstractDelegationTokenSecretManager} + * that cannot be reused due to private method access. Logic here can more efficiently + * deal with external token store by only loading into memory the minimum data needed. + */ + protected void removeExpiredTokens() { + long now = System.currentTimeMillis(); + Iterator i = tokenStore.getAllDelegationTokenIdentifiers() + .iterator(); + while (i.hasNext()) { + AbstractDelegationTokenIdentifier id = i.next(); + if (now > id.getMaxDate()) { + this.tokenStore.removeToken(id); // no need to look at token info + } else { + // get token info to check renew date + DelegationTokenInformation tokenInfo = tokenStore.getToken(id); + if (tokenInfo != null) { + if (now > tokenInfo.getRenewDate()) { + this.tokenStore.removeToken(id); + } + } + } + } + } + + /** + * Extension of rollMasterKey to remove expired keys from store. + * + * @throws IOException + */ + protected void rollMasterKeyExt() throws IOException { + Map keys = reloadKeys(); + int currentKeyId = super.currentId; + HiveDelegationTokenSupport.rollMasterKey(HiveTokenStoreDelegationTokenSecretManager.this); + List keysAfterRoll = Arrays.asList(getAllKeys()); + for (DelegationKey key : keysAfterRoll) { + keys.remove(key.getKeyId()); + if (key.getKeyId() == currentKeyId) { + tokenStore.updateMasterKey(currentKeyId, encodeWritable(key)); + } + } + for (DelegationKey expiredKey : keys.values()) { + LOGGER.info("Removing expired key id={}", expiredKey.getKeyId()); + try { + tokenStore.removeMasterKey(expiredKey.getKeyId()); + } catch (Exception e) { + LOGGER.error("Error removing expired key id={}", expiredKey.getKeyId(), e); + } + } + } + + /** + * Cloned from {@link AbstractDelegationTokenSecretManager} to deal with private access + * restriction (there would not be an need to clone the remove thread if the remove logic was + * protected/extensible). + */ + protected class ExpiredTokenRemover extends Thread { + private long lastMasterKeyUpdate; + private long lastTokenCacheCleanup; + + @Override + public void run() { + LOGGER.info("Starting expired delegation token remover thread, " + + "tokenRemoverScanInterval=" + tokenRemoverScanInterval + / (60 * 1000) + " min(s)"); + while (running) { + try { + long now = System.currentTimeMillis(); + if (lastMasterKeyUpdate + keyUpdateInterval < now) { + try { + rollMasterKeyExt(); + lastMasterKeyUpdate = now; + } catch (IOException e) { + LOGGER.error("Master key updating failed. " + + StringUtils.stringifyException(e)); + } + } + if (lastTokenCacheCleanup + tokenRemoverScanInterval < now) { + removeExpiredTokens(); + lastTokenCacheCleanup = now; + } + try { + Thread.sleep(5000); // 5 seconds + } catch (InterruptedException ie) { + LOGGER + .error("InterruptedException received for ExpiredTokenRemover thread " + + ie); + } + } catch (Throwable t) { + LOGGER.error("ExpiredTokenRemover thread received unexpected exception. " + + t, t); + // Wait 5 seconds too in case of an exception, so we do not end up in busy waiting for + // the solution for this exception + try { + Thread.sleep(5000); // 5 seconds + } catch (InterruptedException ie) { + LOGGER.error("InterruptedException received for ExpiredTokenRemover thread during " + + "wait in exception sleep " + ie); + } + } + } + } + } + +} diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/Utils.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/Utils.java index 3c93186082ae363497dc94ca1303db4f746fd8b5..556879f1d52cf0468b9bca4fa00b3f243dabbc92 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/Utils.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/Utils.java @@ -39,7 +39,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; import org.apache.hadoop.hive.thrift.DelegationTokenSelector; import org.apache.hadoop.io.Text; @@ -189,7 +188,7 @@ public JaasConfiguration(String hiveLoginContextName, String principal, String k krbOptions.put("useKeyTab", "true"); krbOptions.put("keyTab", keyTabFile); } - krbOptions.put("principal", principal); + krbOptions.put("principal", principal); krbOptions.put("refreshKrb5Config", "true"); AppConfigurationEntry hiveZooKeeperClientEntry = new AppConfigurationEntry( KerberosUtil.getKrb5LoginModuleName(), LoginModuleControlFlag.REQUIRED, krbOptions); @@ -203,6 +202,7 @@ public JaasConfiguration(String hiveLoginContextName, String principal, String k } } + public static final String XSRF_CUSTOM_HEADER_PARAM = "custom-header"; public static final String XSRF_CUSTOM_METHODS_TO_IGNORE_PARAM = "methods-to-ignore"; private static final String XSRF_HEADER_DEFAULT = "X-XSRF-HEADER"; diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DBTokenStore.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DBTokenStore.java deleted file mode 100644 index d6dc0796e77591d3afca8dbd29c3aa0eff255dd0..0000000000000000000000000000000000000000 --- a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DBTokenStore.java +++ /dev/null @@ -1,191 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.thrift; - -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.codec.binary.Base64; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; -import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; -import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class DBTokenStore implements DelegationTokenStore { - private static final Logger LOG = LoggerFactory.getLogger(DBTokenStore.class); - private Configuration conf; - - @Override - public int addMasterKey(String s) throws TokenStoreException { - if (LOG.isTraceEnabled()) { - LOG.trace("addMasterKey: s = " + s); - } - return (Integer)invokeOnTokenStore("addMasterKey", new Object[]{s},String.class); - } - - @Override - public void updateMasterKey(int keySeq, String s) throws TokenStoreException { - if (LOG.isTraceEnabled()) { - LOG.trace("updateMasterKey: s = " + s + ", keySeq = " + keySeq); - } - invokeOnTokenStore("updateMasterKey", new Object[] {Integer.valueOf(keySeq), s}, - Integer.class, String.class); - } - - @Override - public boolean removeMasterKey(int keySeq) { - return (Boolean)invokeOnTokenStore("removeMasterKey", new Object[] {Integer.valueOf(keySeq)}, - Integer.class); - } - - @Override - public String[] getMasterKeys() throws TokenStoreException { - return (String[])invokeOnTokenStore("getMasterKeys", new Object[0]); - } - - @Override - public boolean addToken(DelegationTokenIdentifier tokenIdentifier, - DelegationTokenInformation token) throws TokenStoreException { - - try { - String identifier = TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier); - String tokenStr = Base64.encodeBase64URLSafeString( - HiveDelegationTokenSupport.encodeDelegationTokenInformation(token)); - boolean result = (Boolean)invokeOnTokenStore("addToken", new Object[] {identifier, tokenStr}, - String.class, String.class); - if (LOG.isTraceEnabled()) { - LOG.trace("addToken: tokenIdentifier = " + tokenIdentifier + ", added = " + result); - } - return result; - } catch (IOException e) { - throw new TokenStoreException(e); - } - } - - @Override - public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) - throws TokenStoreException { - try { - String tokenStr = (String)invokeOnTokenStore("getToken", new Object[] { - TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier)}, String.class); - DelegationTokenInformation result = null; - if (tokenStr != null) { - result = HiveDelegationTokenSupport.decodeDelegationTokenInformation(Base64.decodeBase64(tokenStr)); - } - if (LOG.isTraceEnabled()) { - LOG.trace("getToken: tokenIdentifier = " + tokenIdentifier + ", result = " + result); - } - return result; - } catch (IOException e) { - throw new TokenStoreException(e); - } - } - - @Override - public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) throws TokenStoreException{ - try { - boolean result = (Boolean)invokeOnTokenStore("removeToken", new Object[] { - TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier)}, String.class); - if (LOG.isTraceEnabled()) { - LOG.trace("removeToken: tokenIdentifier = " + tokenIdentifier + ", removed = " + result); - } - return result; - } catch (IOException e) { - throw new TokenStoreException(e); - } - } - - @Override - public List getAllDelegationTokenIdentifiers() throws TokenStoreException{ - - List tokenIdents = (List)invokeOnTokenStore("getAllTokenIdentifiers", new Object[0]); - List delTokenIdents = new ArrayList(tokenIdents.size()); - - for (String tokenIdent : tokenIdents) { - DelegationTokenIdentifier delToken = new DelegationTokenIdentifier(); - try { - TokenStoreDelegationTokenSecretManager.decodeWritable(delToken, tokenIdent); - } catch (IOException e) { - throw new TokenStoreException(e); - } - delTokenIdents.add(delToken); - } - return delTokenIdents; - } - - private Object handler; - private ServerMode smode; - - @Override - public void init(Object handler, ServerMode smode) throws TokenStoreException { - this.handler = handler; - this.smode = smode; - } - - private Object invokeOnTokenStore(String methName, Object[] params, Class ... paramTypes) - throws TokenStoreException{ - Object tokenStore; - try { - switch (smode) { - case METASTORE : - tokenStore = handler.getClass().getMethod("getMS").invoke(handler); - break; - case HIVESERVER2 : - Object hiveObject = ((Class)handler) - .getMethod("get", org.apache.hadoop.conf.Configuration.class, java.lang.Class.class) - .invoke(handler, conf, DBTokenStore.class); - tokenStore = ((Class)handler).getMethod("getMSC").invoke(hiveObject); - break; - default: - throw new TokenStoreException(new Exception("unknown server mode")); - } - return tokenStore.getClass().getMethod(methName, paramTypes).invoke(tokenStore, params); - } catch (IllegalArgumentException e) { - throw new TokenStoreException(e); - } catch (SecurityException e) { - throw new TokenStoreException(e); - } catch (IllegalAccessException e) { - throw new TokenStoreException(e); - } catch (InvocationTargetException e) { - throw new TokenStoreException(e.getCause()); - } catch (NoSuchMethodException e) { - throw new TokenStoreException(e); - } - } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - } - - @Override - public Configuration getConf() { - return conf; - } - - @Override - public void close() throws IOException { - // No-op. - } - -} diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java index 5299e18743aa45c539287b335f95e8ce8df0fc35..a55c7dbbc2e1ae99a690c90a39612adbb6112c26 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java @@ -25,6 +25,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; /** @@ -33,7 +34,7 @@ * for each token. */ public class DelegationTokenSecretManager - extends AbstractDelegationTokenSecretManager { + extends AbstractDelegationTokenSecretManager { /** * Create a secret manager @@ -54,7 +55,7 @@ public DelegationTokenSecretManager(long delegationKeyUpdateInterval, } @Override - public DelegationTokenIdentifier createIdentifier() { + public AbstractDelegationTokenIdentifier createIdentifier() { return new DelegationTokenIdentifier(); } @@ -65,33 +66,33 @@ public DelegationTokenIdentifier createIdentifier() { * @throws IOException */ public synchronized String verifyDelegationToken(String tokenStrForm) throws IOException { - Token t = new Token(); + Token t = new Token(); t.decodeFromUrlString(tokenStrForm); - DelegationTokenIdentifier id = getTokenIdentifier(t); + AbstractDelegationTokenIdentifier id = getTokenIdentifier(t); verifyToken(id, t.getPassword()); return id.getUser().getShortUserName(); } - protected DelegationTokenIdentifier getTokenIdentifier(Token token) + protected AbstractDelegationTokenIdentifier getTokenIdentifier(Token token) throws IOException { // turn bytes back into identifier for cache lookup ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier()); DataInputStream in = new DataInputStream(buf); - DelegationTokenIdentifier id = createIdentifier(); + AbstractDelegationTokenIdentifier id = createIdentifier(); id.readFields(in); return id; } public synchronized void cancelDelegationToken(String tokenStrForm) throws IOException { - Token t= new Token(); + Token t= new Token(); t.decodeFromUrlString(tokenStrForm); String user = UserGroupInformation.getCurrentUser().getUserName(); cancelToken(t, user); } public synchronized long renewDelegationToken(String tokenStrForm) throws IOException { - Token t= new Token(); + Token t= new Token(); t.decodeFromUrlString(tokenStrForm); String user = UserGroupInformation.getCurrentUser().getUserName(); return renewToken(t, user); @@ -104,9 +105,9 @@ public synchronized String getDelegationToken(String renewer) throws IOException if (ugi.getRealUser() != null) { realUser = new Text(ugi.getRealUser().getUserName()); } - DelegationTokenIdentifier ident = + AbstractDelegationTokenIdentifier ident = new DelegationTokenIdentifier(owner, new Text(renewer), realUser); - Token t = new Token( + Token t = new Token( ident, this); return t.encodeToUrlString(); } @@ -117,7 +118,7 @@ public String getUserFromToken(String tokenStr) throws IOException { ByteArrayInputStream buf = new ByteArrayInputStream(delegationToken.getIdentifier()); DataInputStream in = new DataInputStream(buf); - DelegationTokenIdentifier id = createIdentifier(); + AbstractDelegationTokenIdentifier id = createIdentifier(); id.readFields(in); return id.getUser().getShortUserName(); } diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java deleted file mode 100644 index 867b4ed98859f681770fccfe7478ab519c823924..0000000000000000000000000000000000000000 --- a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java +++ /dev/null @@ -1,118 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.thrift; - -import java.io.Closeable; -import java.util.List; - -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; -import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; - -/** - * Interface for pluggable token store that can be implemented with shared external - * storage for load balancing and high availability (for example using ZooKeeper). - * Internal, store specific errors are translated into {@link TokenStoreException}. - */ -public interface DelegationTokenStore extends Configurable, Closeable { - - /** - * Exception for internal token store errors that typically cannot be handled by the caller. - */ - public static class TokenStoreException extends RuntimeException { - private static final long serialVersionUID = -8693819817623074083L; - - public TokenStoreException(Throwable cause) { - super(cause); - } - - public TokenStoreException(String message, Throwable cause) { - super(message, cause); - } - } - - /** - * Add new master key. The token store assigns and returns the sequence number. - * Caller needs to use the identifier to update the key (since it is embedded in the key). - * - * @param s - * @return sequence number for new key - */ - int addMasterKey(String s) throws TokenStoreException; - - /** - * Update master key (for expiration and setting store assigned sequence within key) - * @param keySeq - * @param s - * @throws TokenStoreException - */ - void updateMasterKey(int keySeq, String s) throws TokenStoreException; - - /** - * Remove key for given id. - * @param keySeq - * @return false if key no longer present, true otherwise. - */ - boolean removeMasterKey(int keySeq); - - /** - * Return all master keys. - * @return - * @throws TokenStoreException - */ - String[] getMasterKeys() throws TokenStoreException; - - /** - * Add token. If identifier is already present, token won't be added. - * @param tokenIdentifier - * @param token - * @return true if token was added, false for existing identifier - */ - boolean addToken(DelegationTokenIdentifier tokenIdentifier, - DelegationTokenInformation token) throws TokenStoreException; - - /** - * Get token. Returns null if the token does not exist. - * @param tokenIdentifier - * @return - */ - DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) - throws TokenStoreException; - - /** - * Remove token. Return value can be used by caller to detect concurrency. - * @param tokenIdentifier - * @return true if token was removed, false if it was already removed. - * @throws TokenStoreException - */ - boolean removeToken(DelegationTokenIdentifier tokenIdentifier) throws TokenStoreException; - - /** - * List of all token identifiers in the store. This is used to remove expired tokens - * and a potential scalability improvement would be to partition by master key id - * @return - */ - List getAllDelegationTokenIdentifiers() throws TokenStoreException; - - /** - * @param hmsHandler ObjectStore used by DBTokenStore - * @param smode Indicate whether this is a metastore or hiveserver2 token store - */ - void init(Object hmsHandler, ServerMode smode); - -} diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java index 6c8b362270e48cb5641d77b09f1a19d875df0cbb..0ac0723b88440481b162a204b0848e609dcf5cc9 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java @@ -55,6 +55,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; import org.apache.thrift.TException; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TProtocol; @@ -481,8 +482,8 @@ public SaslDigestCallbackHandler( this.secretManager = secretManager; } - private char[] getPassword(DelegationTokenIdentifier tokenid) throws InvalidToken { - return encodePassword(secretManager.retrievePassword(tokenid)); + private char[] getPassword(AbstractDelegationTokenIdentifier tokenIdentifier) throws InvalidToken { + return encodePassword(secretManager.retrievePassword(tokenIdentifier)); } private char[] encodePassword(byte[] password) { @@ -511,7 +512,7 @@ public void handle(Callback[] callbacks) throws InvalidToken, } } if (pc != null) { - DelegationTokenIdentifier tokenIdentifier = SaslRpcServer. + AbstractDelegationTokenIdentifier tokenIdentifier = SaslRpcServer. getIdentifier(nc.getDefaultName(), secretManager); char[] password = getPassword(tokenIdentifier); diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HiveDelegationTokenManager.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HiveDelegationTokenManager.java deleted file mode 100644 index b3e4a7608282be603e79d1d101679e239a5219b0..0000000000000000000000000000000000000000 --- a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HiveDelegationTokenManager.java +++ /dev/null @@ -1,172 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.hadoop.hive.thrift; - -import java.io.IOException; -import java.net.InetAddress; -import java.security.PrivilegedExceptionAction; - -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.shims.Utils; -import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; -import org.apache.hadoop.security.authorize.AuthorizationException; -import org.apache.hadoop.security.authorize.ProxyUsers; -import org.apache.hadoop.util.ReflectionUtils; - -public class HiveDelegationTokenManager { - - public static final String DELEGATION_TOKEN_GC_INTERVAL = - "hive.cluster.delegation.token.gc-interval"; - private final static long DELEGATION_TOKEN_GC_INTERVAL_DEFAULT = 3600000; // 1 hour - // Delegation token related keys - public static final String DELEGATION_KEY_UPDATE_INTERVAL_KEY = - "hive.cluster.delegation.key.update-interval"; - public static final long DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT = - 24*60*60*1000; // 1 day - public static final String DELEGATION_TOKEN_RENEW_INTERVAL_KEY = - "hive.cluster.delegation.token.renew-interval"; - public static final long DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT = - 24*60*60*1000; // 1 day - public static final String DELEGATION_TOKEN_MAX_LIFETIME_KEY = - "hive.cluster.delegation.token.max-lifetime"; - public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT = - 7*24*60*60*1000; // 7 days - public static final String DELEGATION_TOKEN_STORE_CLS = - "hive.cluster.delegation.token.store.class"; - public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR = - "hive.cluster.delegation.token.store.zookeeper.connectString"; - // Alternate connect string specification configuration - public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE = - "hive.zookeeper.quorum"; - - public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS = - "hive.cluster.delegation.token.store.zookeeper.connectTimeoutMillis"; - public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE = - "hive.cluster.delegation.token.store.zookeeper.znode"; - public static final String DELEGATION_TOKEN_STORE_ZK_ACL = - "hive.cluster.delegation.token.store.zookeeper.acl"; - public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT = - "/hivedelegation"; - - protected DelegationTokenSecretManager secretManager; - - public HiveDelegationTokenManager() { - } - - public DelegationTokenSecretManager getSecretManager() { - return secretManager; - } - - public void startDelegationTokenSecretManager(Configuration conf, Object hms, ServerMode smode) - throws IOException { - long secretKeyInterval = - conf.getLong(DELEGATION_KEY_UPDATE_INTERVAL_KEY, DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT); - long tokenMaxLifetime = - conf.getLong(DELEGATION_TOKEN_MAX_LIFETIME_KEY, DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT); - long tokenRenewInterval = - conf.getLong(DELEGATION_TOKEN_RENEW_INTERVAL_KEY, DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT); - long tokenGcInterval = - conf.getLong(DELEGATION_TOKEN_GC_INTERVAL, DELEGATION_TOKEN_GC_INTERVAL_DEFAULT); - - DelegationTokenStore dts = getTokenStore(conf); - dts.setConf(conf); - dts.init(hms, smode); - secretManager = - new TokenStoreDelegationTokenSecretManager(secretKeyInterval, tokenMaxLifetime, - tokenRenewInterval, tokenGcInterval, dts); - secretManager.startThreads(); - } - - public String getDelegationToken(final String owner, final String renewer, String remoteAddr) - throws IOException, - InterruptedException { - /** - * If the user asking the token is same as the 'owner' then don't do - * any proxy authorization checks. For cases like oozie, where it gets - * a delegation token for another user, we need to make sure oozie is - * authorized to get a delegation token. - */ - // Do all checks on short names - UserGroupInformation currUser = UserGroupInformation.getCurrentUser(); - UserGroupInformation ownerUgi = UserGroupInformation.createRemoteUser(owner); - if (!ownerUgi.getShortUserName().equals(currUser.getShortUserName())) { - // in the case of proxy users, the getCurrentUser will return the - // real user (for e.g. oozie) due to the doAs that happened just before the - // server started executing the method getDelegationToken in the MetaStore - ownerUgi = UserGroupInformation.createProxyUser(owner, UserGroupInformation.getCurrentUser()); - ProxyUsers.authorize(ownerUgi, remoteAddr, null); - } - return ownerUgi.doAs(new PrivilegedExceptionAction() { - - @Override - public String run() throws IOException { - return secretManager.getDelegationToken(renewer); - } - }); - } - - public String getDelegationTokenWithService(String owner, String renewer, String service, String remoteAddr) - throws IOException, InterruptedException { - String token = getDelegationToken(owner, renewer, remoteAddr); - return Utils.addServiceToToken(token, service); - } - - public long renewDelegationToken(String tokenStrForm) - throws IOException { - return secretManager.renewDelegationToken(tokenStrForm); - } - - public String getUserFromToken(String tokenStr) throws IOException { - return secretManager.getUserFromToken(tokenStr); - } - - public void cancelDelegationToken(String tokenStrForm) throws IOException { - secretManager.cancelDelegationToken(tokenStrForm); - } - - /** - * Verify token string - * @param tokenStrForm - * @return user name - * @throws IOException - */ - public String verifyDelegationToken(String tokenStrForm) throws IOException { - return secretManager.verifyDelegationToken(tokenStrForm); - } - - private DelegationTokenStore getTokenStore(Configuration conf) throws IOException { - String tokenStoreClassName = conf.get(DELEGATION_TOKEN_STORE_CLS, ""); - if (StringUtils.isBlank(tokenStoreClassName)) { - return new MemoryTokenStore(); - } - try { - Class storeClass = - Class.forName(tokenStoreClassName).asSubclass(DelegationTokenStore.class); - return ReflectionUtils.newInstance(storeClass, conf); - } catch (ClassNotFoundException e) { - throw new IOException("Error initializing delegation token store: " + tokenStoreClassName, e); - } - } - - -} diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java deleted file mode 100644 index 9d837b8fa19ed53546c4a95944484cc1f06a21e6..0000000000000000000000000000000000000000 --- a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java +++ /dev/null @@ -1,137 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.thrift; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; -import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Default in-memory token store implementation. - */ -public class MemoryTokenStore implements DelegationTokenStore { - private static final Logger LOG = LoggerFactory.getLogger(MemoryTokenStore.class); - - private final Map masterKeys - = new ConcurrentHashMap(); - - private final ConcurrentHashMap tokens - = new ConcurrentHashMap(); - - private final AtomicInteger masterKeySeq = new AtomicInteger(); - private Configuration conf; - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - } - - @Override - public Configuration getConf() { - return this.conf; - } - - @Override - public int addMasterKey(String s) { - int keySeq = masterKeySeq.getAndIncrement(); - if (LOG.isTraceEnabled()) { - LOG.trace("addMasterKey: s = " + s + ", keySeq = " + keySeq); - } - masterKeys.put(keySeq, s); - return keySeq; - } - - @Override - public void updateMasterKey(int keySeq, String s) { - if (LOG.isTraceEnabled()) { - LOG.trace("updateMasterKey: s = " + s + ", keySeq = " + keySeq); - } - masterKeys.put(keySeq, s); - } - - @Override - public boolean removeMasterKey(int keySeq) { - if (LOG.isTraceEnabled()) { - LOG.trace("removeMasterKey: keySeq = " + keySeq); - } - return masterKeys.remove(keySeq) != null; - } - - @Override - public String[] getMasterKeys() { - return masterKeys.values().toArray(new String[0]); - } - - @Override - public boolean addToken(DelegationTokenIdentifier tokenIdentifier, - DelegationTokenInformation token) { - DelegationTokenInformation tokenInfo = tokens.putIfAbsent(tokenIdentifier, token); - if (LOG.isTraceEnabled()) { - LOG.trace("addToken: tokenIdentifier = " + tokenIdentifier + ", added = " + (tokenInfo == null)); - } - return (tokenInfo == null); - } - - @Override - public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) { - DelegationTokenInformation tokenInfo = tokens.remove(tokenIdentifier); - if (LOG.isTraceEnabled()) { - LOG.trace("removeToken: tokenIdentifier = " + tokenIdentifier + ", removed = " + (tokenInfo != null)); - } - return tokenInfo != null; - } - - @Override - public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) { - DelegationTokenInformation result = tokens.get(tokenIdentifier); - if (LOG.isTraceEnabled()) { - LOG.trace("getToken: tokenIdentifier = " + tokenIdentifier + ", result = " + result); - } - return result; - } - - @Override - public List getAllDelegationTokenIdentifiers() { - List result = new ArrayList( - tokens.size()); - for (DelegationTokenIdentifier id : tokens.keySet()) { - result.add(id); - } - return result; - } - - @Override - public void close() throws IOException { - //no-op - } - - @Override - public void init(Object hmsHandler, ServerMode smode) throws TokenStoreException { - // no-op - } -} diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java deleted file mode 100644 index 4719b85e8d2f9045ae61e0e86c6624b865aa2653..0000000000000000000000000000000000000000 --- a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java +++ /dev/null @@ -1,335 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.thrift; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.lang.reflect.Method; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import org.apache.commons.codec.binary.Base64; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; -import org.apache.hadoop.security.token.delegation.DelegationKey; -import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport; -import org.apache.hadoop.util.Daemon; -import org.apache.hadoop.util.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Extension of {@link DelegationTokenSecretManager} to support alternative to default in-memory - * token management for fail-over and clustering through plug-able token store (ZooKeeper etc.). - * Delegation tokens will be retrieved from the store on-demand and (unlike base class behavior) not - * cached in memory. This avoids complexities related to token expiration. The security token is - * needed only at the time the transport is opened (as opposed to per interface operation). The - * assumption therefore is low cost of interprocess token retrieval (for random read efficient store - * such as ZooKeeper) compared to overhead of synchronizing per-process in-memory token caches. - * The wrapper incorporates the token store abstraction within the limitations of current - * Hive/Hadoop dependency (.20S) with minimum code duplication. - * Eventually this should be supported by Hadoop security directly. - */ -public class TokenStoreDelegationTokenSecretManager extends DelegationTokenSecretManager { - - private static final Logger LOGGER = - LoggerFactory.getLogger(TokenStoreDelegationTokenSecretManager.class.getName()); - - final private long keyUpdateInterval; - final private long tokenRemoverScanInterval; - private Thread tokenRemoverThread; - - final private DelegationTokenStore tokenStore; - - public TokenStoreDelegationTokenSecretManager(long delegationKeyUpdateInterval, - long delegationTokenMaxLifetime, long delegationTokenRenewInterval, - long delegationTokenRemoverScanInterval, - DelegationTokenStore sharedStore) { - super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, delegationTokenRenewInterval, - delegationTokenRemoverScanInterval); - this.keyUpdateInterval = delegationKeyUpdateInterval; - this.tokenRemoverScanInterval = delegationTokenRemoverScanInterval; - - this.tokenStore = sharedStore; - } - - protected Map reloadKeys() { - // read keys from token store - String[] allKeys = tokenStore.getMasterKeys(); - Map keys - = new HashMap(allKeys.length); - for (String keyStr : allKeys) { - DelegationKey key = new DelegationKey(); - try { - decodeWritable(key, keyStr); - keys.put(key.getKeyId(), key); - } catch (IOException ex) { - LOGGER.error("Failed to load master key.", ex); - } - } - synchronized (this) { - super.allKeys.clear(); - super.allKeys.putAll(keys); - } - return keys; - } - - @Override - public byte[] retrievePassword(DelegationTokenIdentifier identifier) throws InvalidToken { - DelegationTokenInformation info = this.tokenStore.getToken(identifier); - if (info == null) { - throw new InvalidToken("token expired or does not exist: " + identifier); - } - // must reuse super as info.getPassword is not accessible - synchronized (this) { - try { - super.currentTokens.put(identifier, info); - return super.retrievePassword(identifier); - } finally { - super.currentTokens.remove(identifier); - } - } - } - - @Override - public DelegationTokenIdentifier cancelToken(Token token, - String canceller) throws IOException { - DelegationTokenIdentifier id = getTokenIdentifier(token); - LOGGER.info("Token cancelation requested for identifier: "+id); - this.tokenStore.removeToken(id); - return id; - } - - /** - * Create the password and add it to shared store. - */ - @Override - protected byte[] createPassword(DelegationTokenIdentifier id) { - byte[] password; - DelegationTokenInformation info; - synchronized (this) { - password = super.createPassword(id); - // add new token to shared store - // need to persist expiration along with password - info = super.currentTokens.remove(id); - if (info == null) { - throw new IllegalStateException("Failed to retrieve token after creation"); - } - } - this.tokenStore.addToken(id, info); - return password; - } - - @Override - public long renewToken(Token token, - String renewer) throws InvalidToken, IOException { - // since renewal is KERBEROS authenticated token may not be cached - final DelegationTokenIdentifier id = getTokenIdentifier(token); - DelegationTokenInformation tokenInfo = this.tokenStore.getToken(id); - if (tokenInfo == null) { - throw new InvalidToken("token does not exist: " + id); // no token found - } - // ensure associated master key is available - if (!super.allKeys.containsKey(id.getMasterKeyId())) { - LOGGER.info("Unknown master key (id={}), (re)loading keys from token store.", - id.getMasterKeyId()); - reloadKeys(); - } - // reuse super renewal logic - synchronized (this) { - super.currentTokens.put(id, tokenInfo); - try { - return super.renewToken(token, renewer); - } finally { - super.currentTokens.remove(id); - } - } - } - - public static String encodeWritable(Writable key) throws IOException { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(bos); - key.write(dos); - dos.flush(); - return Base64.encodeBase64URLSafeString(bos.toByteArray()); - } - - public static void decodeWritable(Writable w, String idStr) throws IOException { - DataInputStream in = new DataInputStream(new ByteArrayInputStream(Base64.decodeBase64(idStr))); - w.readFields(in); - } - - /** - * Synchronize master key updates / sequence generation for multiple nodes. - * NOTE: {@link AbstractDelegationTokenSecretManager} keeps currentKey private, so we need - * to utilize this "hook" to manipulate the key through the object reference. - * This .20S workaround should cease to exist when Hadoop supports token store. - */ - @Override - protected void logUpdateMasterKey(DelegationKey key) throws IOException { - int keySeq = this.tokenStore.addMasterKey(encodeWritable(key)); - // update key with assigned identifier - DelegationKey keyWithSeq = new DelegationKey(keySeq, key.getExpiryDate(), key.getKey()); - String keyStr = encodeWritable(keyWithSeq); - this.tokenStore.updateMasterKey(keySeq, keyStr); - decodeWritable(key, keyStr); - LOGGER.info("New master key with key id={}", key.getKeyId()); - super.logUpdateMasterKey(key); - } - - @Override - public synchronized void startThreads() throws IOException { - try { - // updateCurrentKey needs to be called to initialize the master key - // (there should be a null check added in the future in rollMasterKey) - // updateCurrentKey(); - Method m = AbstractDelegationTokenSecretManager.class.getDeclaredMethod("updateCurrentKey"); - m.setAccessible(true); - m.invoke(this); - } catch (Exception e) { - throw new IOException("Failed to initialize master key", e); - } - running = true; - tokenRemoverThread = new Daemon(new ExpiredTokenRemover()); - tokenRemoverThread.start(); - } - - @Override - public synchronized void stopThreads() { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Stopping expired delegation token remover thread"); - } - running = false; - if (tokenRemoverThread != null) { - tokenRemoverThread.interrupt(); - } - } - - /** - * Remove expired tokens. Replaces logic in {@link AbstractDelegationTokenSecretManager} - * that cannot be reused due to private method access. Logic here can more efficiently - * deal with external token store by only loading into memory the minimum data needed. - */ - protected void removeExpiredTokens() { - long now = System.currentTimeMillis(); - Iterator i = tokenStore.getAllDelegationTokenIdentifiers() - .iterator(); - while (i.hasNext()) { - DelegationTokenIdentifier id = i.next(); - if (now > id.getMaxDate()) { - this.tokenStore.removeToken(id); // no need to look at token info - } else { - // get token info to check renew date - DelegationTokenInformation tokenInfo = tokenStore.getToken(id); - if (tokenInfo != null) { - if (now > tokenInfo.getRenewDate()) { - this.tokenStore.removeToken(id); - } - } - } - } - } - - /** - * Extension of rollMasterKey to remove expired keys from store. - * - * @throws IOException - */ - protected void rollMasterKeyExt() throws IOException { - Map keys = reloadKeys(); - int currentKeyId = super.currentId; - HiveDelegationTokenSupport.rollMasterKey(TokenStoreDelegationTokenSecretManager.this); - List keysAfterRoll = Arrays.asList(getAllKeys()); - for (DelegationKey key : keysAfterRoll) { - keys.remove(key.getKeyId()); - if (key.getKeyId() == currentKeyId) { - tokenStore.updateMasterKey(currentKeyId, encodeWritable(key)); - } - } - for (DelegationKey expiredKey : keys.values()) { - LOGGER.info("Removing expired key id={}", expiredKey.getKeyId()); - try { - tokenStore.removeMasterKey(expiredKey.getKeyId()); - } catch (Exception e) { - LOGGER.error("Error removing expired key id={}", expiredKey.getKeyId(), e); - } - } - } - - /** - * Cloned from {@link AbstractDelegationTokenSecretManager} to deal with private access - * restriction (there would not be an need to clone the remove thread if the remove logic was - * protected/extensible). - */ - protected class ExpiredTokenRemover extends Thread { - private long lastMasterKeyUpdate; - private long lastTokenCacheCleanup; - - @Override - public void run() { - LOGGER.info("Starting expired delegation token remover thread, " - + "tokenRemoverScanInterval=" + tokenRemoverScanInterval - / (60 * 1000) + " min(s)"); - while (running) { - try { - long now = System.currentTimeMillis(); - if (lastMasterKeyUpdate + keyUpdateInterval < now) { - try { - rollMasterKeyExt(); - lastMasterKeyUpdate = now; - } catch (IOException e) { - LOGGER.error("Master key updating failed. " - + StringUtils.stringifyException(e)); - } - } - if (lastTokenCacheCleanup + tokenRemoverScanInterval < now) { - removeExpiredTokens(); - lastTokenCacheCleanup = now; - } - try { - Thread.sleep(5000); // 5 seconds - } catch (InterruptedException ie) { - LOGGER - .error("InterruptedException received for ExpiredTokenRemover thread " - + ie); - } - } catch (Throwable t) { - LOGGER.error("ExpiredTokenRemover thread received unexpected exception. " - + t, t); - // Wait 5 seconds too in case of an exception, so we do not end up in busy waiting for - // the solution for this exception - try { - Thread.sleep(5000); // 5 seconds - } catch (InterruptedException ie) { - LOGGER.error("InterruptedException received for ExpiredTokenRemover thread during " + - "wait in exception sleep " + ie); - } - } - } - } - } - -} diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java deleted file mode 100644 index 885ec56d543e1c46e74dc148c73f249803b7604c..0000000000000000000000000000000000000000 --- a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java +++ /dev/null @@ -1,476 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.thrift; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.commons.lang.StringUtils; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.api.ACLProvider; -import org.apache.curator.framework.imps.CuratorFrameworkState; -import org.apache.curator.retry.ExponentialBackoffRetry; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.hive.shims.Utils; -import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; -import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.ZooDefs.Perms; -import org.apache.zookeeper.data.ACL; -import org.apache.zookeeper.data.Id; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * ZooKeeper token store implementation. - */ -public class ZooKeeperTokenStore implements DelegationTokenStore { - - private static final Logger LOGGER = - LoggerFactory.getLogger(ZooKeeperTokenStore.class.getName()); - - protected static final String ZK_SEQ_FORMAT = "%010d"; - private static final String NODE_KEYS = "/keys"; - private static final String NODE_TOKENS = "/tokens"; - - private String rootNode = ""; - private volatile CuratorFramework zkSession; - private String zkConnectString; - private int connectTimeoutMillis; - private List newNodeAcl = Arrays.asList(new ACL(Perms.ALL, Ids.AUTH_IDS)); - - /** - * ACLProvider permissions will be used in case parent dirs need to be created - */ - private final ACLProvider aclDefaultProvider = new ACLProvider() { - - @Override - public List getDefaultAcl() { - return newNodeAcl; - } - - @Override - public List getAclForPath(String path) { - return getDefaultAcl(); - } - }; - - - private ServerMode serverMode; - - private final String WHEN_ZK_DSTORE_MSG = "when zookeeper based delegation token storage is enabled" - + "(hive.cluster.delegation.token.store.class=" + ZooKeeperTokenStore.class.getName() + ")"; - - private Configuration conf; - - /** - * Default constructor for dynamic instantiation w/ Configurable - * (ReflectionUtils does not support Configuration constructor injection). - */ - protected ZooKeeperTokenStore() { - } - - private CuratorFramework getSession() { - if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) { - synchronized (this) { - if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) { - zkSession = - CuratorFrameworkFactory.builder().connectString(zkConnectString) - .connectionTimeoutMs(connectTimeoutMillis).aclProvider(aclDefaultProvider) - .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); - zkSession.start(); - } - } - } - return zkSession; - } - - private void setupJAASConfig(Configuration conf) throws IOException { - if (!UserGroupInformation.getLoginUser().isFromKeytab()) { - // The process has not logged in using keytab - // this should be a test mode, can't use keytab to authenticate - // with zookeeper. - LOGGER.warn("Login is not from keytab"); - return; - } - - String principal; - String keytab; - switch (serverMode) { - case METASTORE: - principal = getNonEmptyConfVar(conf, "hive.metastore.kerberos.principal"); - keytab = getNonEmptyConfVar(conf, "hive.metastore.kerberos.keytab.file"); - break; - case HIVESERVER2: - principal = getNonEmptyConfVar(conf, "hive.server2.authentication.kerberos.principal"); - keytab = getNonEmptyConfVar(conf, "hive.server2.authentication.kerberos.keytab"); - break; - default: - throw new AssertionError("Unexpected server mode " + serverMode); - } - Utils.setZookeeperClientKerberosJaasConfig(principal, keytab); - } - - private String getNonEmptyConfVar(Configuration conf, String param) throws IOException { - String val = conf.get(param); - if (val == null || val.trim().isEmpty()) { - throw new IOException("Configuration parameter " + param + " should be set, " - + WHEN_ZK_DSTORE_MSG); - } - return val; - } - - /** - * Create a path if it does not already exist ("mkdir -p") - * @param path string with '/' separator - * @param acl list of ACL entries - * @throws TokenStoreException - */ - public void ensurePath(String path, List acl) - throws TokenStoreException { - try { - CuratorFramework zk = getSession(); - String node = zk.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) - .withACL(acl).forPath(path); - LOGGER.info("Created path: {} ", node); - } catch (KeeperException.NodeExistsException e) { - // node already exists - } catch (Exception e) { - throw new TokenStoreException("Error creating path " + path, e); - } - } - - /** - * Parse ACL permission string, from ZooKeeperMain private method - * @param permString - * @return - */ - public static int getPermFromString(String permString) { - int perm = 0; - for (int i = 0; i < permString.length(); i++) { - switch (permString.charAt(i)) { - case 'r': - perm |= ZooDefs.Perms.READ; - break; - case 'w': - perm |= ZooDefs.Perms.WRITE; - break; - case 'c': - perm |= ZooDefs.Perms.CREATE; - break; - case 'd': - perm |= ZooDefs.Perms.DELETE; - break; - case 'a': - perm |= ZooDefs.Perms.ADMIN; - break; - default: - LOGGER.error("Unknown perm type: " + permString.charAt(i)); - } - } - return perm; - } - - /** - * Parse comma separated list of ACL entries to secure generated nodes, e.g. - * sasl:hive/host1@MY.DOMAIN:cdrwa,sasl:hive/host2@MY.DOMAIN:cdrwa - * @param aclString - * @return ACL list - */ - public static List parseACLs(String aclString) { - String[] aclComps = StringUtils.splitByWholeSeparator(aclString, ","); - List acl = new ArrayList(aclComps.length); - for (String a : aclComps) { - if (StringUtils.isBlank(a)) { - continue; - } - a = a.trim(); - // from ZooKeeperMain private method - int firstColon = a.indexOf(':'); - int lastColon = a.lastIndexOf(':'); - if (firstColon == -1 || lastColon == -1 || firstColon == lastColon) { - LOGGER.error(a + " does not have the form scheme:id:perm"); - continue; - } - ACL newAcl = new ACL(); - newAcl.setId(new Id(a.substring(0, firstColon), a.substring( - firstColon + 1, lastColon))); - newAcl.setPerms(getPermFromString(a.substring(lastColon + 1))); - acl.add(newAcl); - } - return acl; - } - - private void initClientAndPaths() { - if (this.zkSession != null) { - this.zkSession.close(); - } - try { - ensurePath(rootNode + NODE_KEYS, newNodeAcl); - ensurePath(rootNode + NODE_TOKENS, newNodeAcl); - } catch (TokenStoreException e) { - throw e; - } - } - - @Override - public void setConf(Configuration conf) { - if (conf == null) { - throw new IllegalArgumentException("conf is null"); - } - this.conf = conf; - } - - @Override - public Configuration getConf() { - return null; // not required - } - - private Map getAllKeys() throws KeeperException, InterruptedException { - - String masterKeyNode = rootNode + NODE_KEYS; - - // get children of key node - List nodes = zkGetChildren(masterKeyNode); - - // read each child node, add to results - Map result = new HashMap(); - for (String node : nodes) { - String nodePath = masterKeyNode + "/" + node; - byte[] data = zkGetData(nodePath); - if (data != null) { - result.put(getSeq(node), data); - } - } - return result; - } - - private List zkGetChildren(String path) { - CuratorFramework zk = getSession(); - try { - return zk.getChildren().forPath(path); - } catch (Exception e) { - throw new TokenStoreException("Error getting children for " + path, e); - } - } - - private byte[] zkGetData(String nodePath) { - CuratorFramework zk = getSession(); - try { - return zk.getData().forPath(nodePath); - } catch (KeeperException.NoNodeException ex) { - return null; - } catch (Exception e) { - throw new TokenStoreException("Error reading " + nodePath, e); - } - } - - private int getSeq(String path) { - String[] pathComps = path.split("/"); - return Integer.parseInt(pathComps[pathComps.length-1]); - } - - @Override - public int addMasterKey(String s) { - String keysPath = rootNode + NODE_KEYS + "/"; - CuratorFramework zk = getSession(); - String newNode; - try { - newNode = zk.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).withACL(newNodeAcl) - .forPath(keysPath, s.getBytes()); - } catch (Exception e) { - throw new TokenStoreException("Error creating new node with path " + keysPath, e); - } - LOGGER.info("Added key {}", newNode); - return getSeq(newNode); - } - - @Override - public void updateMasterKey(int keySeq, String s) { - CuratorFramework zk = getSession(); - String keyPath = rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq); - try { - zk.setData().forPath(keyPath, s.getBytes()); - } catch (Exception e) { - throw new TokenStoreException("Error setting data in " + keyPath, e); - } - } - - @Override - public boolean removeMasterKey(int keySeq) { - String keyPath = rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq); - zkDelete(keyPath); - return true; - } - - private void zkDelete(String path) { - CuratorFramework zk = getSession(); - try { - zk.delete().forPath(path); - } catch (KeeperException.NoNodeException ex) { - // already deleted - } catch (Exception e) { - throw new TokenStoreException("Error deleting " + path, e); - } - } - - @Override - public String[] getMasterKeys() { - try { - Map allKeys = getAllKeys(); - String[] result = new String[allKeys.size()]; - int resultIdx = 0; - for (byte[] keyBytes : allKeys.values()) { - result[resultIdx++] = new String(keyBytes); - } - return result; - } catch (KeeperException ex) { - throw new TokenStoreException(ex); - } catch (InterruptedException ex) { - throw new TokenStoreException(ex); - } - } - - - private String getTokenPath(DelegationTokenIdentifier tokenIdentifier) { - try { - return rootNode + NODE_TOKENS + "/" - + TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier); - } catch (IOException ex) { - throw new TokenStoreException("Failed to encode token identifier", ex); - } - } - - @Override - public boolean addToken(DelegationTokenIdentifier tokenIdentifier, - DelegationTokenInformation token) { - byte[] tokenBytes = HiveDelegationTokenSupport.encodeDelegationTokenInformation(token); - String tokenPath = getTokenPath(tokenIdentifier); - CuratorFramework zk = getSession(); - String newNode; - try { - newNode = zk.create().withMode(CreateMode.PERSISTENT).withACL(newNodeAcl) - .forPath(tokenPath, tokenBytes); - } catch (Exception e) { - throw new TokenStoreException("Error creating new node with path " + tokenPath, e); - } - - LOGGER.info("Added token: {}", newNode); - return true; - } - - @Override - public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) { - String tokenPath = getTokenPath(tokenIdentifier); - zkDelete(tokenPath); - return true; - } - - @Override - public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) { - byte[] tokenBytes = zkGetData(getTokenPath(tokenIdentifier)); - if(tokenBytes == null) { - // The token is already removed. - return null; - } - try { - return HiveDelegationTokenSupport.decodeDelegationTokenInformation(tokenBytes); - } catch (Exception ex) { - throw new TokenStoreException("Failed to decode token", ex); - } - } - - @Override - public List getAllDelegationTokenIdentifiers() { - String containerNode = rootNode + NODE_TOKENS; - final List nodes = zkGetChildren(containerNode); - List result = new java.util.ArrayList( - nodes.size()); - for (String node : nodes) { - DelegationTokenIdentifier id = new DelegationTokenIdentifier(); - try { - TokenStoreDelegationTokenSecretManager.decodeWritable(id, node); - result.add(id); - } catch (Exception e) { - LOGGER.warn("Failed to decode token '{}'", node); - } - } - return result; - } - - @Override - public void close() throws IOException { - if (this.zkSession != null) { - this.zkSession.close(); - } - } - - @Override - public void init(Object hmsHandler, ServerMode smode) { - this.serverMode = smode; - zkConnectString = - conf.get(HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null); - if (zkConnectString == null || zkConnectString.trim().isEmpty()) { - // try alternate config param - zkConnectString = - conf.get( - HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE, - null); - if (zkConnectString == null || zkConnectString.trim().isEmpty()) { - throw new IllegalArgumentException("Zookeeper connect string has to be specifed through " - + "either " + HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR - + " or " - + HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE - + WHEN_ZK_DSTORE_MSG); - } - } - connectTimeoutMillis = - conf.getInt( - HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS, - CuratorFrameworkFactory.builder().getConnectionTimeoutMs()); - String aclStr = conf.get(HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ACL, null); - if (StringUtils.isNotBlank(aclStr)) { - this.newNodeAcl = parseACLs(aclStr); - } - rootNode = - conf.get(HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ZNODE, - HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT) + serverMode; - - try { - // Install the JAAS Configuration for the runtime - setupJAASConfig(conf); - } catch (IOException e) { - throw new TokenStoreException("Error setting up JAAS configuration for zookeeper client " - + e.getMessage(), e); - } - initClientAndPaths(); - } - -} diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DBTokenStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DBTokenStore.java new file mode 100644 index 0000000000000000000000000000000000000000..e3fa8771e673e1e9c65eebfc8f6ac7108237a18c --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DBTokenStore.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.security; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; +import org.apache.hadoop.security.token.delegation.MetastoreDelegationTokenSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DBTokenStore implements DelegationTokenStore { + private static final Logger LOG = LoggerFactory.getLogger(DBTokenStore.class); + private Configuration conf; + + @Override + public int addMasterKey(String s) throws TokenStoreException { + if (LOG.isTraceEnabled()) { + LOG.trace("addMasterKey: s = " + s); + } + return (Integer)invokeOnTokenStore("addMasterKey", new Object[]{s},String.class); + } + + @Override + public void updateMasterKey(int keySeq, String s) throws TokenStoreException { + if (LOG.isTraceEnabled()) { + LOG.trace("updateMasterKey: s = " + s + ", keySeq = " + keySeq); + } + invokeOnTokenStore("updateMasterKey", new Object[] {Integer.valueOf(keySeq), s}, + Integer.class, String.class); + } + + @Override + public boolean removeMasterKey(int keySeq) { + return (Boolean)invokeOnTokenStore("removeMasterKey", new Object[] {Integer.valueOf(keySeq)}, + Integer.class); + } + + @Override + public String[] getMasterKeys() throws TokenStoreException { + return (String[])invokeOnTokenStore("getMasterKeys", new Object[0]); + } + + @Override + public boolean addToken(AbstractDelegationTokenIdentifier tokenIdentifier, + DelegationTokenInformation token) throws TokenStoreException { + + try { + String identifier = TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier); + String tokenStr = Base64.encodeBase64URLSafeString( + MetastoreDelegationTokenSupport.encodeDelegationTokenInformation(token)); + boolean result = (Boolean)invokeOnTokenStore("addToken", new Object[] {identifier, tokenStr}, + String.class, String.class); + if (LOG.isTraceEnabled()) { + LOG.trace("addToken: tokenIdentifier = " + tokenIdentifier + ", added = " + result); + } + return result; + } catch (IOException e) { + throw new TokenStoreException(e); + } + } + + @Override + public DelegationTokenInformation getToken(AbstractDelegationTokenIdentifier tokenIdentifier) + throws TokenStoreException { + try { + String tokenStr = (String)invokeOnTokenStore("getToken", new Object[] { + TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier)}, String.class); + DelegationTokenInformation result = null; + if (tokenStr != null) { + result = MetastoreDelegationTokenSupport.decodeDelegationTokenInformation(Base64.decodeBase64(tokenStr)); + } + if (LOG.isTraceEnabled()) { + LOG.trace("getToken: tokenIdentifier = " + tokenIdentifier + ", result = " + result); + } + return result; + } catch (IOException e) { + throw new TokenStoreException(e); + } + } + + @Override + public boolean removeToken(AbstractDelegationTokenIdentifier tokenIdentifier) throws TokenStoreException{ + try { + boolean result = (Boolean)invokeOnTokenStore("removeToken", new Object[] { + TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier)}, String.class); + if (LOG.isTraceEnabled()) { + LOG.trace("removeToken: tokenIdentifier = " + tokenIdentifier + ", removed = " + result); + } + return result; + } catch (IOException e) { + throw new TokenStoreException(e); + } + } + + @Override + public List getAllDelegationTokenIdentifiers() throws TokenStoreException{ + + List tokenIdents = (List)invokeOnTokenStore("getAllTokenIdentifiers", new Object[0]); + List delTokenIdents = new ArrayList(tokenIdents.size()); + + for (String tokenIdent : tokenIdents) { + DelegationTokenIdentifier delToken = new DelegationTokenIdentifier(); + try { + TokenStoreDelegationTokenSecretManager.decodeWritable(delToken, tokenIdent); + } catch (IOException e) { + throw new TokenStoreException(e); + } + delTokenIdents.add(delToken); + } + return delTokenIdents; + } + + private Object handler; + private TokenStoreServerMode serverMode; + + @Override + public void init(Object handler, TokenStoreServerMode serverMode) throws TokenStoreException { + this.handler = handler; + this.serverMode = serverMode; + } + + private Object invokeOnTokenStore(String methName, Object[] params, Class ... paramTypes) + throws TokenStoreException{ + Object tokenStore; + try { + switch (serverMode) { + case METASTORE: + tokenStore = handler.getClass().getMethod("getMS").invoke(handler); + break; + case HIVESERVER2: + Object hiveObject = ((Class) handler) + .getMethod("get", org.apache.hadoop.conf.Configuration.class, java.lang.Class.class) + .invoke(handler, conf, DBTokenStore.class); + tokenStore = ((Class) handler).getMethod("getMSC").invoke(hiveObject); + break; + default: + throw new IllegalArgumentException("Unexpected value of Server mode " + serverMode); + } + return tokenStore.getClass().getMethod(methName, paramTypes).invoke(tokenStore, params); + } catch (IllegalArgumentException e) { + throw new TokenStoreException(e); + } catch (SecurityException e) { + throw new TokenStoreException(e); + } catch (IllegalAccessException e) { + throw new TokenStoreException(e); + } catch (InvocationTargetException e) { + throw new TokenStoreException(e.getCause()); + } catch (NoSuchMethodException e) { + throw new TokenStoreException(e); + } + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void close() throws IOException { + // No-op. + } + +} diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenStore.java index 0cafeff89614c94ba551a5f1ba9c4d892ee5720a..6f84fe9925e7bfaf96b846558eaac4a469bb48f5 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenStore.java @@ -21,7 +21,7 @@ import java.util.List; import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge.Server.ServerMode; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; /** @@ -31,6 +31,10 @@ */ public interface DelegationTokenStore extends Configurable, Closeable { + static enum TokenStoreServerMode { + METASTORE, + HIVESERVER2 + } /** * Exception for internal token store errors that typically cannot be handled by the caller. */ @@ -83,7 +87,7 @@ public TokenStoreException(String message, Throwable cause) { * @param token * @return true if token was added, false for existing identifier */ - boolean addToken(DelegationTokenIdentifier tokenIdentifier, + boolean addToken(AbstractDelegationTokenIdentifier tokenIdentifier, DelegationTokenInformation token) throws TokenStoreException; /** @@ -91,7 +95,7 @@ boolean addToken(DelegationTokenIdentifier tokenIdentifier, * @param tokenIdentifier * @return */ - DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) + DelegationTokenInformation getToken(AbstractDelegationTokenIdentifier tokenIdentifier) throws TokenStoreException; /** @@ -100,19 +104,19 @@ DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) * @return true if token was removed, false if it was already removed. * @throws TokenStoreException */ - boolean removeToken(DelegationTokenIdentifier tokenIdentifier) throws TokenStoreException; + boolean removeToken(AbstractDelegationTokenIdentifier tokenIdentifier) throws TokenStoreException; /** * List of all token identifiers in the store. This is used to remove expired tokens * and a potential scalability improvement would be to partition by master key id * @return */ - List getAllDelegationTokenIdentifiers() throws TokenStoreException; + List getAllDelegationTokenIdentifiers() throws TokenStoreException; /** * @param hmsHandler ObjectStore used by DBTokenStore - * @param smode Indicate whether this is a metastore or hiveserver2 token store + * @param serverMode indicate if this tokenstore is for Metastore and HiveServer2 */ - void init(Object hmsHandler, ServerMode smode); + void init(Object hmsHandler, TokenStoreServerMode serverMode); } diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MemoryTokenStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MemoryTokenStore.java index c484cd3132d08c8c0493fa98e79fa449e84ef9b8..faf4840d1b13cb3684d3232202f73d526463be8e 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MemoryTokenStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MemoryTokenStore.java @@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge.Server.ServerMode; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +39,7 @@ private final Map masterKeys = new ConcurrentHashMap<>(); - private final ConcurrentHashMap tokens + private final ConcurrentHashMap tokens = new ConcurrentHashMap<>(); private final AtomicInteger masterKeySeq = new AtomicInteger(); @@ -87,7 +87,7 @@ public boolean removeMasterKey(int keySeq) { } @Override - public boolean addToken(DelegationTokenIdentifier tokenIdentifier, + public boolean addToken(AbstractDelegationTokenIdentifier tokenIdentifier, DelegationTokenInformation token) { DelegationTokenInformation tokenInfo = tokens.putIfAbsent(tokenIdentifier, token); if (LOG.isTraceEnabled()) { @@ -97,7 +97,7 @@ public boolean addToken(DelegationTokenIdentifier tokenIdentifier, } @Override - public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) { + public boolean removeToken(AbstractDelegationTokenIdentifier tokenIdentifier) { DelegationTokenInformation tokenInfo = tokens.remove(tokenIdentifier); if (LOG.isTraceEnabled()) { LOG.trace("removeToken: tokenIdentifier = " + tokenIdentifier + ", removed = " + (tokenInfo != null)); @@ -106,7 +106,7 @@ public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) { } @Override - public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) { + public DelegationTokenInformation getToken(AbstractDelegationTokenIdentifier tokenIdentifier) { DelegationTokenInformation result = tokens.get(tokenIdentifier); if (LOG.isTraceEnabled()) { LOG.trace("getToken: tokenIdentifier = " + tokenIdentifier + ", result = " + result); @@ -115,10 +115,10 @@ public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdenti } @Override - public List getAllDelegationTokenIdentifiers() { - List result = new ArrayList<>( + public List getAllDelegationTokenIdentifiers() { + List result = new ArrayList<>( tokens.size()); - for (DelegationTokenIdentifier id : tokens.keySet()) { + for (AbstractDelegationTokenIdentifier id : tokens.keySet()) { result.add(id); } return result; @@ -130,7 +130,7 @@ public void close() throws IOException { } @Override - public void init(Object hmsHandler, ServerMode smode) throws TokenStoreException { + public void init(Object hmsHandler, TokenStoreServerMode isEmbedded) throws TokenStoreException { // no-op } } diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java index 2b0110fe0badcb571144067539031fb6ac81276b..ec3cbada7b7e661592eb8fadfa2d34a8b885a11c 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java @@ -26,6 +26,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.security.DelegationTokenStore.TokenStoreServerMode; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.UserGroupInformation; @@ -33,8 +34,20 @@ import org.apache.hadoop.util.ReflectionUtils; public class MetastoreDelegationTokenManager { - + public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR = + "hive.cluster.delegation.token.store.zookeeper.connectString"; protected DelegationTokenSecretManager secretManager; + // Alternate connect string specification configuration + public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE = + "hive.zookeeper.quorum"; + + public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS = + "hive.cluster.delegation.token.store.zookeeper.connectTimeoutMillis"; + public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE = + "hive.cluster.delegation.token.store.zookeeper.znode"; + public static final String DELEGATION_TOKEN_STORE_ZK_ACL = + "hive.cluster.delegation.token.store.zookeeper.acl"; + public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT = "/hivedelegation"; public MetastoreDelegationTokenManager() { } @@ -43,7 +56,11 @@ public DelegationTokenSecretManager getSecretManager() { return secretManager; } - public void startDelegationTokenSecretManager(Configuration conf, Object hms, HadoopThriftAuthBridge.Server.ServerMode smode) + public void startDelegationTokenSecretManager(Configuration conf, Object hms) throws IOException { + startDelegationTokenSecretManager(conf, hms, TokenStoreServerMode.METASTORE); + } + + public void startDelegationTokenSecretManager(Configuration conf, Object hms, TokenStoreServerMode isEmbedded) throws IOException { long secretKeyInterval = MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.DELEGATION_KEY_UPDATE_INTERVAL, TimeUnit.MILLISECONDS); @@ -56,7 +73,7 @@ public void startDelegationTokenSecretManager(Configuration conf, Object hms, Ha DelegationTokenStore dts = getTokenStore(conf); dts.setConf(conf); - dts.init(hms, smode); + dts.init(hms, isEmbedded); secretManager = new TokenStoreDelegationTokenSecretManager(secretKeyInterval, tokenMaxLifetime, tokenRenewInterval, tokenGcInterval, dts); diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TokenStoreDelegationTokenSecretManager.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TokenStoreDelegationTokenSecretManager.java index 4abcec789476982389b3c0bc34d4990c59dbf9da..e4a52777bcac797231fc9df149ddaf2f7cbc2c9c 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TokenStoreDelegationTokenSecretManager.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TokenStoreDelegationTokenSecretManager.java @@ -33,6 +33,7 @@ import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.io.Writable; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.security.token.delegation.MetastoreDelegationTokenSupport; @@ -233,10 +234,10 @@ public synchronized void stopThreads() { */ protected void removeExpiredTokens() { long now = System.currentTimeMillis(); - Iterator i = tokenStore.getAllDelegationTokenIdentifiers() + Iterator i = tokenStore.getAllDelegationTokenIdentifiers() .iterator(); while (i.hasNext()) { - DelegationTokenIdentifier id = i.next(); + AbstractDelegationTokenIdentifier id = i.next(); if (now > id.getMaxDate()) { this.tokenStore.removeToken(id); // no need to look at token info } else { diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStore.java new file mode 100644 index 0000000000000000000000000000000000000000..79db401eda37428009b7b6c8e924a20699b80ab6 --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStore.java @@ -0,0 +1,475 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.security; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang.StringUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.ACLProvider; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.utils.SecurityUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; +import org.apache.hadoop.security.token.delegation.MetastoreDelegationTokenSupport; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooDefs.Perms; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Id; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ZooKeeper token store implementation. + */ +public class ZooKeeperTokenStore implements DelegationTokenStore { + + private static final Logger LOGGER = + LoggerFactory.getLogger(ZooKeeperTokenStore.class.getName()); + + protected static final String ZK_SEQ_FORMAT = "%010d"; + private static final String NODE_KEYS = "/keys"; + private static final String NODE_TOKENS = "/tokens"; + + private String rootNode = ""; + private volatile CuratorFramework zkSession; + private String zkConnectString; + private int connectTimeoutMillis; + private List newNodeAcl = Arrays.asList(new ACL(Perms.ALL, Ids.AUTH_IDS)); + + /** + * ACLProvider permissions will be used in case parent dirs need to be created + */ + private final ACLProvider aclDefaultProvider = new ACLProvider() { + + @Override + public List getDefaultAcl() { + return newNodeAcl; + } + + @Override + public List getAclForPath(String path) { + return getDefaultAcl(); + } + }; + + + private final String WHEN_ZK_DSTORE_MSG = "when zookeeper based delegation token storage is enabled" + + "(hive.cluster.delegation.token.store.class=" + ZooKeeperTokenStore.class.getName() + ")"; + + private Configuration conf; + + private TokenStoreServerMode serverMode; + + /** + * Default constructor for dynamic instantiation w/ Configurable + * (ReflectionUtils does not support Configuration constructor injection). + */ + protected ZooKeeperTokenStore() { + } + + private CuratorFramework getSession() { + if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) { + synchronized (this) { + if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) { + zkSession = + CuratorFrameworkFactory.builder().connectString(zkConnectString) + .connectionTimeoutMs(connectTimeoutMillis).aclProvider(aclDefaultProvider) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); + zkSession.start(); + } + } + } + return zkSession; + } + + private void setupJAASConfig(Configuration conf) throws IOException { + if (!UserGroupInformation.getLoginUser().isFromKeytab()) { + // The process has not logged in using keytab + // this should be a test mode, can't use keytab to authenticate + // with zookeeper. + LOGGER.warn("Login is not from keytab"); + return; + } + + String principal; + String keytab; + switch (serverMode) { + case METASTORE: + principal = getNonEmptyConfVar(conf, "hive.metastore.kerberos.principal"); + keytab = getNonEmptyConfVar(conf, "hive.metastore.kerberos.keytab.file"); + break; + case HIVESERVER2: + principal = getNonEmptyConfVar(conf, "hive.server2.authentication.kerberos.principal"); + keytab = getNonEmptyConfVar(conf, "hive.server2.authentication.kerberos.keytab"); + break; + default: + throw new AssertionError("Unexpected server mode " + serverMode); + } + SecurityUtils.setZookeeperClientKerberosJaasConfig(principal, keytab); + } + + private String getNonEmptyConfVar(Configuration conf, String param) throws IOException { + String val = conf.get(param); + if (val == null || val.trim().isEmpty()) { + throw new IOException("Configuration parameter " + param + " should be set, " + + WHEN_ZK_DSTORE_MSG); + } + return val; + } + + /** + * Create a path if it does not already exist ("mkdir -p") + * @param path string with '/' separator + * @param acl list of ACL entries + * @throws TokenStoreException + */ + public void ensurePath(String path, List acl) + throws TokenStoreException { + try { + CuratorFramework zk = getSession(); + String node = zk.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) + .withACL(acl).forPath(path); + LOGGER.info("Created path: {} ", node); + } catch (KeeperException.NodeExistsException e) { + // node already exists + } catch (Exception e) { + throw new TokenStoreException("Error creating path " + path, e); + } + } + + /** + * Parse ACL permission string, from ZooKeeperMain private method + * @param permString + * @return + */ + public static int getPermFromString(String permString) { + int perm = 0; + for (int i = 0; i < permString.length(); i++) { + switch (permString.charAt(i)) { + case 'r': + perm |= ZooDefs.Perms.READ; + break; + case 'w': + perm |= ZooDefs.Perms.WRITE; + break; + case 'c': + perm |= ZooDefs.Perms.CREATE; + break; + case 'd': + perm |= ZooDefs.Perms.DELETE; + break; + case 'a': + perm |= ZooDefs.Perms.ADMIN; + break; + default: + LOGGER.error("Unknown perm type: " + permString.charAt(i)); + } + } + return perm; + } + + /** + * Parse comma separated list of ACL entries to secure generated nodes, e.g. + * sasl:hive/host1@MY.DOMAIN:cdrwa,sasl:hive/host2@MY.DOMAIN:cdrwa + * @param aclString + * @return ACL list + */ + public static List parseACLs(String aclString) { + String[] aclComps = StringUtils.splitByWholeSeparator(aclString, ","); + List acl = new ArrayList(aclComps.length); + for (String a : aclComps) { + if (StringUtils.isBlank(a)) { + continue; + } + a = a.trim(); + // from ZooKeeperMain private method + int firstColon = a.indexOf(':'); + int lastColon = a.lastIndexOf(':'); + if (firstColon == -1 || lastColon == -1 || firstColon == lastColon) { + LOGGER.error(a + " does not have the form scheme:id:perm"); + continue; + } + ACL newAcl = new ACL(); + newAcl.setId(new Id(a.substring(0, firstColon), a.substring( + firstColon + 1, lastColon))); + newAcl.setPerms(getPermFromString(a.substring(lastColon + 1))); + acl.add(newAcl); + } + return acl; + } + + private void initClientAndPaths() { + if (this.zkSession != null) { + this.zkSession.close(); + } + try { + ensurePath(rootNode + NODE_KEYS, newNodeAcl); + ensurePath(rootNode + NODE_TOKENS, newNodeAcl); + } catch (TokenStoreException e) { + throw e; + } + } + + @Override + public void setConf(Configuration conf) { + if (conf == null) { + throw new IllegalArgumentException("conf is null"); + } + this.conf = conf; + } + + @Override + public Configuration getConf() { + return null; // not required + } + + private Map getAllKeys() throws KeeperException, InterruptedException { + + String masterKeyNode = rootNode + NODE_KEYS; + + // get children of key node + List nodes = zkGetChildren(masterKeyNode); + + // read each child node, add to results + Map result = new HashMap(); + for (String node : nodes) { + String nodePath = masterKeyNode + "/" + node; + byte[] data = zkGetData(nodePath); + if (data != null) { + result.put(getSeq(node), data); + } + } + return result; + } + + private List zkGetChildren(String path) { + CuratorFramework zk = getSession(); + try { + return zk.getChildren().forPath(path); + } catch (Exception e) { + throw new TokenStoreException("Error getting children for " + path, e); + } + } + + private byte[] zkGetData(String nodePath) { + CuratorFramework zk = getSession(); + try { + return zk.getData().forPath(nodePath); + } catch (KeeperException.NoNodeException ex) { + return null; + } catch (Exception e) { + throw new TokenStoreException("Error reading " + nodePath, e); + } + } + + private int getSeq(String path) { + String[] pathComps = path.split("/"); + return Integer.parseInt(pathComps[pathComps.length-1]); + } + + @Override + public int addMasterKey(String s) { + String keysPath = rootNode + NODE_KEYS + "/"; + CuratorFramework zk = getSession(); + String newNode; + try { + newNode = zk.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).withACL(newNodeAcl) + .forPath(keysPath, s.getBytes()); + } catch (Exception e) { + throw new TokenStoreException("Error creating new node with path " + keysPath, e); + } + LOGGER.info("Added key {}", newNode); + return getSeq(newNode); + } + + @Override + public void updateMasterKey(int keySeq, String s) { + CuratorFramework zk = getSession(); + String keyPath = rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq); + try { + zk.setData().forPath(keyPath, s.getBytes()); + } catch (Exception e) { + throw new TokenStoreException("Error setting data in " + keyPath, e); + } + } + + @Override + public boolean removeMasterKey(int keySeq) { + String keyPath = rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq); + zkDelete(keyPath); + return true; + } + + private void zkDelete(String path) { + CuratorFramework zk = getSession(); + try { + zk.delete().forPath(path); + } catch (KeeperException.NoNodeException ex) { + // already deleted + } catch (Exception e) { + throw new TokenStoreException("Error deleting " + path, e); + } + } + + @Override + public String[] getMasterKeys() { + try { + Map allKeys = getAllKeys(); + String[] result = new String[allKeys.size()]; + int resultIdx = 0; + for (byte[] keyBytes : allKeys.values()) { + result[resultIdx++] = new String(keyBytes); + } + return result; + } catch (KeeperException ex) { + throw new TokenStoreException(ex); + } catch (InterruptedException ex) { + throw new TokenStoreException(ex); + } + } + + + private String getTokenPath(AbstractDelegationTokenIdentifier tokenIdentifier) { + try { + return rootNode + NODE_TOKENS + "/" + + TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier); + } catch (IOException ex) { + throw new TokenStoreException("Failed to encode token identifier", ex); + } + } + + @Override + public boolean addToken(AbstractDelegationTokenIdentifier tokenIdentifier, + DelegationTokenInformation token) { + byte[] tokenBytes = MetastoreDelegationTokenSupport.encodeDelegationTokenInformation(token); + String tokenPath = getTokenPath(tokenIdentifier); + CuratorFramework zk = getSession(); + String newNode; + try { + newNode = zk.create().withMode(CreateMode.PERSISTENT).withACL(newNodeAcl) + .forPath(tokenPath, tokenBytes); + } catch (Exception e) { + throw new TokenStoreException("Error creating new node with path " + tokenPath, e); + } + + LOGGER.info("Added token: {}", newNode); + return true; + } + + @Override + public boolean removeToken(AbstractDelegationTokenIdentifier tokenIdentifier) { + String tokenPath = getTokenPath(tokenIdentifier); + zkDelete(tokenPath); + return true; + } + + @Override + public DelegationTokenInformation getToken(AbstractDelegationTokenIdentifier tokenIdentifier) { + byte[] tokenBytes = zkGetData(getTokenPath(tokenIdentifier)); + if(tokenBytes == null) { + // The token is already removed. + return null; + } + try { + return MetastoreDelegationTokenSupport.decodeDelegationTokenInformation(tokenBytes); + } catch (Exception ex) { + throw new TokenStoreException("Failed to decode token", ex); + } + } + + @Override + public List getAllDelegationTokenIdentifiers() { + String containerNode = rootNode + NODE_TOKENS; + final List nodes = zkGetChildren(containerNode); + List result = new java.util.ArrayList( + nodes.size()); + for (String node : nodes) { + DelegationTokenIdentifier id = new DelegationTokenIdentifier(); + try { + TokenStoreDelegationTokenSecretManager.decodeWritable(id, node); + result.add(id); + } catch (Exception e) { + LOGGER.warn("Failed to decode token '{}'", node); + } + } + return result; + } + + @Override + public void close() throws IOException { + if (this.zkSession != null) { + this.zkSession.close(); + } + } + + @Override + public void init(Object hmsHandler, TokenStoreServerMode serverMode) { + this.serverMode = serverMode; + zkConnectString = + conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null); + if (zkConnectString == null || zkConnectString.trim().isEmpty()) { + // try alternate config param + zkConnectString = + conf.get( + MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE, + null); + if (zkConnectString == null || zkConnectString.trim().isEmpty()) { + throw new IllegalArgumentException("Zookeeper connect string has to be specifed through " + + "either " + MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR + + " or " + + MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE + + WHEN_ZK_DSTORE_MSG); + } + } + connectTimeoutMillis = + conf.getInt( + MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS, + CuratorFrameworkFactory.builder().getConnectionTimeoutMs()); + String aclStr = conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ACL, null); + if (StringUtils.isNotBlank(aclStr)) { + this.newNodeAcl = parseACLs(aclStr); + } + rootNode = + conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ZNODE, + MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT) + serverMode; + + try { + // Install the JAAS Configuration for the runtime + setupJAASConfig(conf); + } catch (IOException e) { + throw new TokenStoreException("Error setting up JAAS configuration for zookeeper client " + + e.getMessage(), e); + } + initClientAndPaths(); + } + +} diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java index 9f0ca829b1e3a469a3275c54a74d90278fc030fa..69508bb47468a2699c86d07b66a3f491e86f23d4 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java @@ -17,10 +17,18 @@ */ package org.apache.hadoop.hive.metastore.utils; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.util.KerberosUtil; +import org.apache.zookeeper.client.ZooKeeperSaslClient; +import javax.security.auth.login.AppConfigurationEntry; import javax.security.auth.login.LoginException; +import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag; + import java.io.IOException; +import java.util.HashMap; +import java.util.Map; public class SecurityUtils { public static UserGroupInformation getUGI() throws LoginException, IOException { @@ -36,5 +44,69 @@ public static UserGroupInformation getUGI() throws LoginException, IOException { } return UserGroupInformation.getCurrentUser(); } + /** + * Dynamically sets up the JAAS configuration that uses kerberos + * @param principal + * @param keyTabFile + * @throws IOException + */ + public static void setZookeeperClientKerberosJaasConfig(String principal, String keyTabFile) throws IOException { + // ZooKeeper property name to pick the correct JAAS conf section + final String SASL_LOGIN_CONTEXT_NAME = "HiveZooKeeperClient"; + System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, SASL_LOGIN_CONTEXT_NAME); + + principal = SecurityUtil.getServerPrincipal(principal, "0.0.0.0"); + JaasConfiguration jaasConf = new JaasConfiguration(SASL_LOGIN_CONTEXT_NAME, principal, keyTabFile); + + // Install the Configuration in the runtime. + javax.security.auth.login.Configuration.setConfiguration(jaasConf); + } + + /** + * A JAAS configuration for ZooKeeper clients intended to use for SASL + * Kerberos. + */ + private static class JaasConfiguration extends javax.security.auth.login.Configuration { + // Current installed Configuration + private static final boolean IBM_JAVA = System.getProperty("java.vendor") + .contains("IBM"); + private final javax.security.auth.login.Configuration baseConfig = javax.security.auth.login.Configuration + .getConfiguration(); + private final String loginContextName; + private final String principal; + private final String keyTabFile; + + public JaasConfiguration(String hiveLoginContextName, String principal, String keyTabFile) { + this.loginContextName = hiveLoginContextName; + this.principal = principal; + this.keyTabFile = keyTabFile; + } + + @Override + public AppConfigurationEntry[] getAppConfigurationEntry(String appName) { + if (loginContextName.equals(appName)) { + Map krbOptions = new HashMap(); + if (IBM_JAVA) { + krbOptions.put("credsType", "both"); + krbOptions.put("useKeytab", keyTabFile); + } else { + krbOptions.put("doNotPrompt", "true"); + krbOptions.put("storeKey", "true"); + krbOptions.put("useKeyTab", "true"); + krbOptions.put("keyTab", keyTabFile); + } + krbOptions.put("principal", principal); + krbOptions.put("refreshKrb5Config", "true"); + AppConfigurationEntry hiveZooKeeperClientEntry = new AppConfigurationEntry( + KerberosUtil.getKrb5LoginModuleName(), LoginModuleControlFlag.REQUIRED, krbOptions); + return new AppConfigurationEntry[] { hiveZooKeeperClientEntry }; + } + // Try the base config + if (baseConfig != null) { + return baseConfig.getAppConfigurationEntry(appName); + } + return null; + } + } }