diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index d62e527..b2c4e20 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2595,7 +2595,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal // TODO: Make use of this config to configure fetch size HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE("hive.server2.thrift.resultset.max.fetch.size", 10000, "Max number of rows sent in one Fetch RPC call by the server to the client."), - HIVE_SERVER2_RESULTSET_DEFAULT_FETCH_SIZE("hive.server2.resultset.default.fetch.size", 10000, + HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE("hive.server2.thrift.resultset.default.fetch.size", 1000, "The number of rows sent in one Fetch RPC call by the server to the client, if not\n" + "specified by the client."), HIVE_SERVER2_XSRF_FILTER_ENABLED("hive.server2.xsrf.filter.enabled",false, @@ -4185,6 +4185,7 @@ private static String getSQLStdAuthDefaultWhiteListPattern() { "hive\\.parquet\\..*", "hive\\.ppd\\..*", "hive\\.prewarm\\..*", + "hive\\.server2\\.thrift\\.resultset\\.default\\.fetch\\.size", "hive\\.server2\\.proxy\\.user", "hive\\.skewjoin\\..*", "hive\\.smbjoin\\..*", diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java index 3d4057b..afe23f8 100644 --- itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java @@ -1321,4 +1321,34 @@ public void testReplDumpResultSet() throws Exception { fs.delete(testPath, true); } } + + @Test + public void testFetchSize() throws Exception { + // Test setting fetch size below max + Connection fsConn = getConnection(miniHS2.getJdbcURL("default", "fetchSize=50", ""), + System.getProperty("user.name"), "bar"); + Statement stmt = fsConn.createStatement(); + stmt.execute("set hive.server2.thrift.resultset.serialize.in.tasks=true"); + int fetchSize = stmt.getFetchSize(); + assertEquals(50, fetchSize); + stmt.close(); + fsConn.close(); + // Test setting fetch size above max + fsConn = getConnection( + miniHS2.getJdbcURL( + "default", + "fetchSize=" + (miniHS2.getHiveConf().getIntVar( + HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE) + 1), + ""), + System.getProperty("user.name"), "bar"); + stmt = fsConn.createStatement(); + stmt.execute("set hive.server2.thrift.resultset.serialize.in.tasks=true"); + fetchSize = stmt.getFetchSize(); + assertEquals( + miniHS2.getHiveConf().getIntVar( + HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE), + fetchSize); + stmt.close(); + fsConn.close(); + } } diff --git jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java index d6cf744..6c91101 100644 --- jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java +++ jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java @@ -637,6 +637,9 @@ private void openSession() throws SQLException { } // switch the database openConf.put("use:database", connParams.getDbName()); + // set the fetchSize + openConf.put("set:hiveconf:hive.server2.thrift.resultset.default.fetch.size", + Integer.toString(fetchSize)); // set the session configuration Map sessVars = connParams.getSessionVars(); @@ -662,6 +665,13 @@ private void openSession() throws SQLException { } protocol = openResp.getServerProtocolVersion(); sessHandle = openResp.getSessionHandle(); + + // Update fetchSize if modified by server + String serverFetchSize = + openResp.getConfiguration().get("hive.server2.thrift.resultset.default.fetch.size"); + if (serverFetchSize != null) { + fetchSize = Integer.parseInt(serverFetchSize); + } } catch (TException e) { LOG.error("Error opening session", e); throw new SQLException("Could not establish connection to " diff --git serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java index 84ed6ba..16cc916 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java +++ serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java @@ -71,7 +71,9 @@ @Override public void initialize(Configuration conf, Properties tbl) throws SerDeException { // Get column names - MAX_BUFFERED_ROWS = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE); + MAX_BUFFERED_ROWS = + HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE); + LOG.info("ThriftJDBCBinarySerDe max number of buffered columns: " + MAX_BUFFERED_ROWS); String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS); String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); final String columnNameDelimiter = tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl diff --git service/src/java/org/apache/hive/service/cli/CLIService.java service/src/java/org/apache/hive/service/cli/CLIService.java index de44ecb..45c608a 100644 --- service/src/java/org/apache/hive/service/cli/CLIService.java +++ service/src/java/org/apache/hive/service/cli/CLIService.java @@ -78,7 +78,7 @@ public CLIService(HiveServer2 hiveServer2) { public synchronized void init(HiveConf hiveConf) { this.hiveConf = hiveConf; sessionManager = new SessionManager(hiveServer2); - defaultFetchRows = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_RESULTSET_DEFAULT_FETCH_SIZE); + defaultFetchRows = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE); addService(sessionManager); // If the hadoop cluster is secure, do a kerberos login for the service from the keytab if (UserGroupInformation.isSecurityEnabled()) { @@ -460,6 +460,11 @@ public OperationStatus getOperationStatus(OperationHandle opHandle) return opStatus; } + public HiveConf getSessionConf(SessionHandle sessionHandle) + throws HiveSQLException { + return sessionManager.getSession(sessionHandle).getSessionConf(); + } + /* (non-Javadoc) * @see org.apache.hive.service.cli.ICLIService#cancelOperation(org.apache.hive.service.cli.OperationHandle) */ diff --git service/src/java/org/apache/hive/service/cli/CLIServiceClient.java service/src/java/org/apache/hive/service/cli/CLIServiceClient.java index 4b84872..c965abc 100644 --- service/src/java/org/apache/hive/service/cli/CLIServiceClient.java +++ service/src/java/org/apache/hive/service/cli/CLIServiceClient.java @@ -31,10 +31,10 @@ * */ public abstract class CLIServiceClient implements ICLIService { - protected int defaultFetchRows = ConfVars.HIVE_SERVER2_RESULTSET_DEFAULT_FETCH_SIZE.defaultIntVal; + protected int defaultFetchRows = ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE.defaultIntVal; public CLIServiceClient(Configuration conf) { - defaultFetchRows = HiveConf.getIntVar(conf, ConfVars.HIVE_SERVER2_RESULTSET_DEFAULT_FETCH_SIZE); + defaultFetchRows = HiveConf.getIntVar(conf, ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE); } public SessionHandle openSession(String username, String password) diff --git service/src/java/org/apache/hive/service/cli/session/HiveSession.java service/src/java/org/apache/hive/service/cli/session/HiveSession.java index e5d865b..bd4d90d 100644 --- service/src/java/org/apache/hive/service/cli/session/HiveSession.java +++ service/src/java/org/apache/hive/service/cli/session/HiveSession.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.concurrent.Future; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hive.service.auth.HiveAuthFactory; @@ -183,7 +184,14 @@ OperationHandle getCrossReference(String primaryCatalog, String primarySchema, String primaryTable, String foreignCatalog, String foreignSchema, String foreignTable) throws HiveSQLException; - + + /** + * + * @return + * @throws HiveSQLException + */ + HiveConf getSessionConf() throws HiveSQLException; + /** * close the session * @throws HiveSQLException diff --git service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index f939a93..fd74d55 100644 --- service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -177,6 +177,9 @@ public void open(Map sessionConfMap) throws HiveSQLException { } // Process global init file: .hiverc processGlobalInitFile(); + // Set fetch size in session conf map + sessionConfMap = setFetchSize(sessionConfMap); + if (sessionConfMap != null) { configureSession(sessionConfMap); } @@ -245,6 +248,22 @@ private void processGlobalInitFile() { } } + private Map setFetchSize(Map sessionConfMap) { + int maxFetchSize = + sessionConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE); + String confFetchSize = sessionConfMap != null ? + sessionConfMap.get( + "set:hiveconf:" + HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE.varname) : + null; + if (confFetchSize != null && !confFetchSize.isEmpty()) { + int fetchSize = Integer.parseInt(confFetchSize); + sessionConfMap.put( + "set:hiveconf:" + HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE.varname, + Integer.toString(fetchSize > maxFetchSize ? maxFetchSize : fetchSize)); + } + return sessionConfMap; + } + private void configureSession(Map sessionConfMap) throws HiveSQLException { SessionState.setCurrentSessionState(sessionState); for (Map.Entry entry : sessionConfMap.entrySet()) { @@ -465,6 +484,11 @@ public GetInfoValue getInfo(GetInfoType getInfoType) } @Override + public HiveConf getSessionConf() throws HiveSQLException { + return this.sessionConf; + } + + @Override public OperationHandle executeStatement(String statement, Map confOverlay) throws HiveSQLException { return executeStatementInternal(statement, confOverlay, false, 0); } diff --git service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 2938338..d7c1548 100644 --- service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -309,10 +309,17 @@ public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException { LOG.info("Client protocol version: " + req.getClient_protocol()); TOpenSessionResp resp = new TOpenSessionResp(); try { + Map openConf = req.getConfiguration(); + SessionHandle sessionHandle = getSessionHandle(req, resp); resp.setSessionHandle(sessionHandle.toTSessionHandle()); - // TODO: set real configuration map - resp.setConfiguration(new HashMap()); + Map configurationMap = new HashMap(); + // Set the updated fetch size from the server into the configuration map for the client + configurationMap.put( + HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE.varname, + Integer.toString(cliService.getSessionConf(sessionHandle).getIntVar( + HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE))); + resp.setConfiguration(configurationMap); resp.setStatus(OK_STATUS); ThriftCLIServerContext context = (ThriftCLIServerContext)currentServerContext.get(); @@ -697,6 +704,12 @@ public TGetResultSetMetadataResp GetResultSetMetadata(TGetResultSetMetadataReq r public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException { TFetchResultsResp resp = new TFetchResultsResp(); try { + // Set fetch size + int maxFetchSize = + hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE); + if (req.getMaxRows() > maxFetchSize) { + req.setMaxRows(maxFetchSize); + } RowSet rowSet = cliService.fetchResults( new OperationHandle(req.getOperationHandle()), FetchOrientation.getFetchOrientation(req.getOrientation()),