diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java index d797d40642..35611deb57 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java @@ -28,6 +28,7 @@ import org.apache.hive.service.rpc.thrift.TSetClientInfoReq; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.common.auth.HiveAuthUtils; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hive.jdbc.Utils.JdbcConnectionParams; import org.apache.hive.service.auth.HiveAuthConstants; import org.apache.hive.service.auth.KerberosSaslHelper; @@ -70,6 +71,9 @@ import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManagerFactory; @@ -143,7 +147,8 @@ private final List supportedProtocols = new LinkedList(); private int loginTimeout = 0; private TProtocolVersion protocol; - private int fetchSize = HiveStatement.DEFAULT_FETCH_SIZE; + private int initFetchSize = 0; + private int defaultFetchSize = 0; private String initFile = null; private String wmPool = null, wmApp = null; private Properties clientInfo; @@ -262,7 +267,7 @@ public HiveConnection(String uri, Properties info) throws SQLException { isEmbeddedMode = connParams.isEmbeddedMode(); if (sessConfMap.containsKey(JdbcConnectionParams.FETCH_SIZE)) { - fetchSize = Integer.parseInt(sessConfMap.get(JdbcConnectionParams.FETCH_SIZE)); + initFetchSize = Integer.parseInt(sessConfMap.get(JdbcConnectionParams.FETCH_SIZE)); } if (sessConfMap.containsKey(JdbcConnectionParams.INIT_FILE)) { initFile = sessConfMap.get(JdbcConnectionParams.INIT_FILE); @@ -832,9 +837,6 @@ private void openSession() throws SQLException { } // switch the database openConf.put("use:database", connParams.getDbName()); - // set the fetchSize - openConf.put("set:hiveconf:hive.server2.thrift.resultset.default.fetch.size", - Integer.toString(fetchSize)); if (wmPool != null) { openConf.put("set:hivevar:wmpool", wmPool); } @@ -867,12 +869,13 @@ private void openSession() throws SQLException { protocol = openResp.getServerProtocolVersion(); sessHandle = openResp.getSessionHandle(); - // Update fetchSize if modified by server - String serverFetchSize = - openResp.getConfiguration().get("hive.server2.thrift.resultset.default.fetch.size"); - if (serverFetchSize != null) { - fetchSize = Integer.parseInt(serverFetchSize); - } + String serverFetchSizeString = + openResp.getConfiguration().get(ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE.varname); + Preconditions.checkState(serverFetchSizeString != null, "Server returned a null default fetch size. Check that " + + ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE.varname + " is configured correctly."); + + defaultFetchSize = Integer.parseInt(serverFetchSizeString); + Preconditions.checkState(defaultFetchSize > 0, "Default fetch size must be greater than 0"); } catch (TException e) { LOG.error("Error opening session", e); throw new SQLException("Could not establish connection to " @@ -1107,7 +1110,7 @@ public Statement createStatement() throws SQLException { if (isClosed) { throw new SQLException("Can't create Statement, connection is closed"); } - return new HiveStatement(this, client, sessHandle, fetchSize); + return new HiveStatement(this, client, sessHandle, false, initFetchSize, defaultFetchSize); } /* @@ -1127,8 +1130,8 @@ public Statement createStatement(int resultSetType, int resultSetConcurrency) throw new SQLException("Statement with resultset type " + resultSetType + " is not supported", "HYC00"); // Optional feature not implemented } - return new HiveStatement(this, client, sessHandle, - resultSetType == ResultSet.TYPE_SCROLL_INSENSITIVE, fetchSize); + return new HiveStatement(this, client, sessHandle, resultSetType == ResultSet.TYPE_SCROLL_INSENSITIVE, + initFetchSize, defaultFetchSize); } /* diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index 693203fab3..d22b7bb4fd 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -42,6 +42,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; @@ -53,6 +55,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; /** * HiveStatement. @@ -60,13 +63,15 @@ */ public class HiveStatement implements java.sql.Statement { public static final Logger LOG = LoggerFactory.getLogger(HiveStatement.class.getName()); - public static final int DEFAULT_FETCH_SIZE = 1000; + private static final int DEFAULT_FETCH_SIZE = 1000; + private final HiveConnection connection; private TCLIService.Iface client; private TOperationHandle stmtHandle = null; private final TSessionHandle sessHandle; Map sessConf = new HashMap(); - private int fetchSize = DEFAULT_FETCH_SIZE; + private int fetchSize; + private final int defaultFetchSize; private boolean isScrollableResultset = false; private boolean isOperationComplete = false; /** @@ -122,26 +127,21 @@ public HiveStatement(HiveConnection connection, TCLIService.Iface client, TSessionHandle sessHandle) { - this(connection, client, sessHandle, false, DEFAULT_FETCH_SIZE); + this(connection, client, sessHandle, false, 0, DEFAULT_FETCH_SIZE); } - public HiveStatement(HiveConnection connection, TCLIService.Iface client, - TSessionHandle sessHandle, int fetchSize) { - this(connection, client, sessHandle, false, fetchSize); - } + public HiveStatement(HiveConnection connection, TCLIService.Iface client, TSessionHandle sessHandle, + boolean isScrollableResultset, int initFetchSize, int defaultFetchSize) { + Preconditions.checkArgument(initFetchSize >= 0); + Preconditions.checkArgument(defaultFetchSize > 0); - public HiveStatement(HiveConnection connection, TCLIService.Iface client, - TSessionHandle sessHandle, boolean isScrollableResultset) { - this(connection, client, sessHandle, isScrollableResultset, DEFAULT_FETCH_SIZE); - } + this.connection = Objects.requireNonNull(connection); + this.client = Objects.requireNonNull(client); + this.sessHandle = Objects.requireNonNull(sessHandle); - public HiveStatement(HiveConnection connection, TCLIService.Iface client, - TSessionHandle sessHandle, boolean isScrollableResultset, int fetchSize) { - this.connection = connection; - this.client = client; - this.sessHandle = sessHandle; this.isScrollableResultset = isScrollableResultset; - this.fetchSize = fetchSize; + this.defaultFetchSize = defaultFetchSize; + this.fetchSize = (initFetchSize == 0) ? defaultFetchSize : initFetchSize; } /* @@ -811,12 +811,9 @@ public void setFetchDirection(int direction) throws SQLException { public void setFetchSize(int rows) throws SQLException { checkConnection("setFetchSize"); if (rows > 0) { - fetchSize = rows; + this.fetchSize = rows; } else if (rows == 0) { - // Javadoc for Statement interface states that if the value is zero - // then "fetch size" hint is ignored. - // In this case it means reverting it to the default value. - fetchSize = DEFAULT_FETCH_SIZE; + this.fetchSize = this.defaultFetchSize; } else { throw new SQLException("Fetch size must be greater or equal to 0"); }