diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 7a89a0c..c367e6f 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -46,6 +46,7 @@ import javax.sql.DataSource; import java.io.IOException; +import java.io.PrintWriter; import java.sql.*; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -2911,15 +2912,9 @@ private static synchronized void setupJdbcConnectionPool(HiveConf conf) throws S if (connPool != null) return; String driverUrl = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORECONNECTURLKEY); - String user = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORE_CONNECTION_USER_NAME); - String passwd; - try { - passwd = ShimLoader.getHadoopShims().getPassword(conf, - HiveConf.ConfVars.METASTOREPWD.varname); - } catch (IOException err) { - throw new SQLException("Error getting metastore password", err); - } - String connectionPooler = HiveConf.getVar(conf, + String user = getMetastoreJdbcUser(conf); + String passwd = getMetastoreJdbcPasswd(conf); + String connectionPooler = conf.getVar( HiveConf.ConfVars.METASTORE_CONNECTION_POOLING_TYPE).toLowerCase(); if ("bonecp".equals(connectionPooler)) { @@ -2940,8 +2935,11 @@ private static synchronized void setupJdbcConnectionPool(HiveConf conf) throws S // This doesn't get used, but it's still necessary, see // http://svn.apache.org/viewvc/commons/proper/dbcp/branches/DBCP_1_4_x_BRANCH/doc/ManualPoolingDataSourceExample.java?view=markup PoolableConnectionFactory poolConnFactory = - new PoolableConnectionFactory(connFactory, objectPool, null, null, false, true); + new PoolableConnectionFactory(connFactory, objectPool, null, null, false, true); connPool = new PoolingDataSource(objectPool); + } else if ("none".equals(connectionPooler)) { + LOG.info("Choosing not to pool JDBC connections"); + connPool = new NoPoolConnectionPool(conf); } else { throw new RuntimeException("Unknown JDBC connection pooling " + connectionPooler); } @@ -3410,4 +3408,118 @@ private String addLimitClause(int numRows, String noSelectsqlQuery) throws MetaE } } } + + private static String getMetastoreJdbcUser(HiveConf conf) { + return conf.getVar(HiveConf.ConfVars.METASTORE_CONNECTION_USER_NAME); + } + + private static String getMetastoreJdbcPasswd(HiveConf conf) throws SQLException { + try { + return ShimLoader.getHadoopShims().getPassword(conf, + HiveConf.ConfVars.METASTOREPWD.varname); + } catch (IOException err) { + throw new SQLException("Error getting metastore password", err); + } + } + + private static class NoPoolConnectionPool implements DataSource { + // Note that this depends on the fact that no-one in this class calls anything but + // getConnection. If you want to use any of the Logger or wrap calls you'll have to + // implement them. + private final HiveConf conf; + private Driver driver; + private String connString; + private String user; + private String passwd; + + public NoPoolConnectionPool(HiveConf conf) { + this.conf = conf; + } + + @Override + public Connection getConnection() throws SQLException { + if (user == null) { + user = getMetastoreJdbcUser(conf); + passwd = getMetastoreJdbcPasswd(conf); + } + return getConnection(user, passwd); + } + + @Override + public Connection getConnection(String username, String password) throws SQLException { + // Find the JDBC driver + if (driver == null) { + String driverName = conf.getVar(HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER); + if (driverName == null || driverName.equals("")) { + String msg = "JDBC driver for transaction db not set in configuration " + + "file, need to set " + HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER.varname; + LOG.error(msg); + throw new RuntimeException(msg); + } + try { + LOG.info("Going to load JDBC driver " + driverName); + driver = (Driver) Class.forName(driverName).newInstance(); + } catch (InstantiationException e) { + throw new RuntimeException("Unable to instantiate driver " + driverName + ", " + + e.getMessage(), e); + } catch (IllegalAccessException e) { + throw new RuntimeException( + "Unable to access driver " + driverName + ", " + e.getMessage(), + e); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Unable to find driver " + driverName + ", " + e.getMessage(), + e); + } + connString = conf.getVar(HiveConf.ConfVars.METASTORECONNECTURLKEY); + } + + try { + LOG.info("Connecting to transaction db with connection string " + connString); + Properties connectionProps = new Properties(); + connectionProps.setProperty("user", username); + connectionProps.setProperty("password", password); + Connection conn = driver.connect(connString, connectionProps); + conn.setAutoCommit(false); + return conn; + } catch (SQLException e) { + throw new RuntimeException("Unable to connect to transaction manager using " + connString + + ", " + e.getMessage(), e); + } + } + + @Override + public PrintWriter getLogWriter() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void setLogWriter(PrintWriter out) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void setLoginTimeout(int seconds) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public int getLoginTimeout() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException { + throw new UnsupportedOperationException(); + } + + @Override + public T unwrap(Class iface) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isWrapperFor(Class iface) throws SQLException { + throw new UnsupportedOperationException(); + } + }; } diff --git ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNoConnectionPool.java ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNoConnectionPool.java new file mode 100644 index 0000000..e5f4dde --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNoConnectionPool.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; +import org.apache.hadoop.hive.metastore.api.TxnState; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLException; +import java.util.List; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertTrue; + +/** + * This test checks that the transaction handler works when the connection pool is set to none. + */ +public class TestTxnHandlerNoConnectionPool { + private static final Logger LOG = + LoggerFactory.getLogger(TestTxnHandlerNoConnectionPool.class.getName()); + + private HiveConf conf = new HiveConf(); + private TxnStore txnHandler; + + @Before + public void setUp() throws Exception { + conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_POOLING_TYPE, "None"); + TxnDbUtil.setConfValues(conf); + try { + TxnDbUtil.prepDb(); + } catch (SQLException e) { + // Usually this means we've already created the tables, so clean them and then try again + tearDown(); + TxnDbUtil.prepDb(); + } + txnHandler = TxnUtils.getTxnStore(conf); + } + + @After + public void tearDown() throws Exception { + TxnDbUtil.cleanDb(); + } + + @Test + public void testOpenTxn() throws Exception { + long first = openTxn(); + assertEquals(1L, first); + long second = openTxn(); + assertEquals(2L, second); + GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo(); + assertEquals(2L, txnsInfo.getTxn_high_water_mark()); + assertEquals(2, txnsInfo.getOpen_txns().size()); + assertEquals(1L, txnsInfo.getOpen_txns().get(0).getId()); + assertEquals(TxnState.OPEN, txnsInfo.getOpen_txns().get(0).getState()); + assertEquals(2L, txnsInfo.getOpen_txns().get(1).getId()); + assertEquals(TxnState.OPEN, txnsInfo.getOpen_txns().get(1).getState()); + assertEquals("me", txnsInfo.getOpen_txns().get(1).getUser()); + assertEquals("localhost", txnsInfo.getOpen_txns().get(1).getHostname()); + + GetOpenTxnsResponse txns = txnHandler.getOpenTxns(); + assertEquals(2L, txns.getTxn_high_water_mark()); + assertEquals(2, txns.getOpen_txns().size()); + boolean[] saw = new boolean[3]; + for (int i = 0; i < saw.length; i++) saw[i] = false; + for (Long tid : txns.getOpen_txns()) { + saw[tid.intValue()] = true; + } + for (int i = 1; i < saw.length; i++) assertTrue(saw[i]); + } + + private long openTxn() throws MetaException { + List txns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")).getTxn_ids(); + return txns.get(0); + } + +}