diff --git jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java index 00f4351..dd1d780 100644 --- jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java +++ jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java @@ -26,6 +26,7 @@ import java.sql.DatabaseMetaData; import java.sql.NClob; import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.SQLClientInfoException; import java.sql.SQLException; import java.sql.SQLWarning; @@ -310,8 +311,14 @@ public Statement createStatement() throws SQLException { public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { - // TODO Auto-generated method stub - throw new SQLException("Method not supported"); + if (resultSetConcurrency != ResultSet.CONCUR_READ_ONLY) { + throw new SQLException("Method not supported"); + } + if (resultSetType == ResultSet.TYPE_SCROLL_SENSITIVE) { + throw new SQLException("Method not supported"); + } + return new HiveStatement(client, sessHandle, + resultSetType == ResultSet.TYPE_SCROLL_INSENSITIVE); } /* diff --git jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java index 61985d1..cf2d475 100644 --- jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java +++ jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java @@ -20,6 +20,7 @@ import static org.apache.hive.service.cli.thrift.TCLIServiceConstants.TYPE_NAMES; +import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.ArrayList; @@ -60,6 +61,8 @@ private Iterator fetchedRowsItr; private boolean isClosed = false; private boolean emptyResultSet = false; + private boolean isScrollable = false; + private boolean fetchFirst = false; public static class Builder { @@ -78,6 +81,7 @@ private List colTypes; private int fetchSize = 50; private boolean emptyResultSet = false; + private boolean isScrollable = false; public Builder setClient(TCLIService.Iface client) { this.client = client; @@ -118,6 +122,11 @@ public Builder setEmptyResultSet(boolean emptyResultSet) { return this; } + public Builder setScrollable(boolean setScrollable) { + this.isScrollable = setScrollable; + return this; + } + public HiveQueryResultSet build() throws SQLException { return new HiveQueryResultSet(this); } @@ -142,6 +151,7 @@ protected HiveQueryResultSet(Builder builder) throws SQLException { } else { this.maxRows = builder.maxRows; } + this.isScrollable = builder.isScrollable; } /** @@ -219,9 +229,18 @@ public boolean next() throws SQLException { } try { + TFetchOrientation orientation = TFetchOrientation.FETCH_NEXT; + if (fetchFirst) { + // If we are asked to start from begining, clear the current fetched resultset + orientation = TFetchOrientation.FETCH_FIRST; + fetchedRows = null; + fetchedRowsItr = null; + fetchFirst = false; + } + if (fetchedRows == null || !fetchedRowsItr.hasNext()) { TFetchResultsReq fetchReq = new TFetchResultsReq(stmtHandle, - TFetchOrientation.FETCH_NEXT, fetchSize); + orientation, fetchSize); TFetchResultsResp fetchResp = client.FetchResults(fetchReq); Utils.verifySuccessWithInfo(fetchResp.getStatus()); fetchedRows = fetchResp.getResults().getRows(); @@ -267,6 +286,18 @@ public void setFetchSize(int rows) throws SQLException { } @Override + public int getType() throws SQLException { + if (isClosed) { + throw new SQLException("Resultset is closed"); + } + if (isScrollable) { + return ResultSet.TYPE_SCROLL_INSENSITIVE; + } else { + return ResultSet.TYPE_FORWARD_ONLY; + } + } + + @Override public int getFetchSize() throws SQLException { if (isClosed) { throw new SQLException("Resultset is closed"); @@ -283,4 +314,36 @@ public int getFetchSize() throws SQLException { //JDK 1.7 throw new SQLException("Method not supported"); } + + /** + * Moves the cursor before the first row of the resultset. + * + * @see java.sql.ResultSet#next() + * @throws SQLException + * if a database access error occurs. + */ + @Override + public void beforeFirst() throws SQLException { + if (isClosed) { + throw new SQLException("Resultset is closed"); + } + if (!isScrollable) { + throw new SQLException("Method not supported for TYPE_FORWARD_ONLY resultset"); + } + fetchFirst = true; + rowsFetched = 0; + } + + @Override + public boolean isBeforeFirst() throws SQLException { + if (isClosed) { + throw new SQLException("Resultset is closed"); + } + return (rowsFetched == 0); + } + + @Override + public int getRow() throws SQLException { + return rowsFetched; + } } diff --git jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index 982ceb8..5a30478 100644 --- jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -45,6 +45,7 @@ private final TSessionHandle sessHandle; Map sessConf = new HashMap(); private int fetchSize = 50; + private boolean isScrollableResultset = false; /** * We need to keep a reference to the result set to support the following: * @@ -75,8 +76,14 @@ * */ public HiveStatement(TCLIService.Iface client, TSessionHandle sessHandle) { + this(client, sessHandle, false); + } + + public HiveStatement(TCLIService.Iface client, TSessionHandle sessHandle, + boolean isScrollableResultset) { this.client = client; this.sessHandle = sessHandle; + this.isScrollableResultset = isScrollableResultset; } /* @@ -197,6 +204,7 @@ public boolean execute(String sql) throws SQLException { } resultSet = new HiveQueryResultSet.Builder().setClient(client).setSessionHandle(sessHandle) .setStmtHandle(stmtHandle).setMaxRows(maxRows).setFetchSize(fetchSize) + .setScrollable(isScrollableResultset) .build(); return true; } diff --git jdbc/src/test/org/apache/hive/jdbc/TestJdbcDriver2.java jdbc/src/test/org/apache/hive/jdbc/TestJdbcDriver2.java index 1042125..7f7b86f 100644 --- jdbc/src/test/org/apache/hive/jdbc/TestJdbcDriver2.java +++ jdbc/src/test/org/apache/hive/jdbc/TestJdbcDriver2.java @@ -31,8 +31,10 @@ import java.sql.SQLException; import java.sql.Statement; import java.sql.Types; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; @@ -196,6 +198,7 @@ protected void tearDown() throws Exception { expectedException); } + public void testBadURL() throws Exception { checkBadUrl("jdbc:hive2://localhost:10000;principal=test"); checkBadUrl("jdbc:hive2://localhost:10000;" + @@ -1335,6 +1338,145 @@ public void testInvalidURL() throws Exception { assertNull(conn); } + /** + * Test the cursor repositioning to start of resultset + * @throws Exception + */ + public void testFetchFirstQuery() throws Exception { + execFetchFirst("select c1 from " + dataTypeTableName + " order by c1", false); + execFetchFirst("select c1 from " + dataTypeTableName + " order by c1", true); + } + + /** + * Test the cursor repositioning to start of resultset from non-mr query + * @throws Exception + */ + public void testFetchFirstNonMR() throws Exception { + execFetchFirst("select * from " + dataTypeTableName, false); + } + + /** + * Test for cursor repositioning to start of resultset for non-sql commands + * @throws Exception + */ + public void testFetchFirstSetCmds() throws Exception { + // verify that fetch_first is not supported + Statement stmt = con.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, + ResultSet.CONCUR_READ_ONLY); + ResultSet res = stmt.executeQuery("set -v"); + String resultStr = null; + while(res.next()) { + if (resultStr == null) { + resultStr = res.getString(1); + } + } + res.beforeFirst(); + res.next(); + assertEquals("Values should be same after reposition", + resultStr, res.getString(1)); + res.close(); + stmt.close(); + } + + /** + * Test for cursor repositioning to start of resultset for non-sql commands + * @throws Exception + */ + public void testFetchFirstDfsCmds() throws Exception { + // verify that fetch_first is not supported + Statement stmt = con.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, + ResultSet.CONCUR_READ_ONLY); + String wareHouseDir = conf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname); + ResultSet res = stmt.executeQuery("dfs -ls " + wareHouseDir); + String resultStr = null; + while(res.next()) { + if (resultStr == null) { + resultStr = res.getString(1); + } + } + res.beforeFirst(); + res.next(); + assertEquals("Values should be same after reposition", + resultStr, res.getString(1)); + } + + + /** + * Negative Test for cursor repositioning to start of resultset + * Verify unsupported JDBC resultset attributes + * @throws Exception + */ + public void testUnsupportedFetchTypes() throws Exception { + Statement stmt; + try { + stmt = con.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, + ResultSet.CONCUR_READ_ONLY); + assertTrue("createStatement with TYPE_SCROLL_SENSITIVE should fail", false); + } catch(SQLException e) { + assertEquals("Method not supported", e.getMessage()); + } + + try { + stmt = con.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, + ResultSet.CONCUR_UPDATABLE); + assertTrue("createStatement with CONCUR_UPDATABLE should fail", false); + } catch(SQLException e) { + assertEquals("Method not supported", e.getMessage()); + } + } + + /** + * Negative Test for cursor repositioning to start of resultset + * Verify unsupported JDBC resultset methods + * @throws Exception + */ + public void testFetchFirstError() throws Exception { + Statement stmt = con.createStatement(); + ResultSet res = stmt.executeQuery("select * from " + tableName); + try { + res.beforeFirst(); + assertTrue("beforeFirst() should fail for normal resultset", false); + } catch (SQLException e) { + assertEquals("Method not supported for TYPE_FORWARD_ONLY resultset", e.getMessage()); + } + } + /** + * Read the results locally. Then reset the read position to start and read the + * rows again verify that we get the same results next time. + * @param sqlStmt + * @param oneRowOnly + * @throws Exception + */ + private void execFetchFirst(String sqlStmt, boolean oneRowOnly) throws Exception { + Statement stmt = con.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, + ResultSet.CONCUR_READ_ONLY); + ResultSet res = stmt.executeQuery(sqlStmt); + + List results = new ArrayList (); + assertTrue(res.isBeforeFirst()); + int rowNum = 0; + while (res.next()) { + results.add(res.getInt(1)); + assertEquals(++rowNum, res.getRow()); + assertFalse(res.isBeforeFirst()); + if (oneRowOnly) { + break; + } + } + // reposition at the begining + res.beforeFirst(); + assertTrue(res.isBeforeFirst()); + rowNum = 0; + while (res.next()) { + // compare the results fetched last time + assertEquals(results.get(rowNum++).intValue(), res.getInt(1)); + assertEquals(rowNum, res.getRow()); + assertFalse(res.isBeforeFirst()); + if (oneRowOnly) { + break; + } + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/Context.java ql/src/java/org/apache/hadoop/hive/ql/Context.java index 2a3ee24..15de0cf 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -473,6 +473,13 @@ private DataInput getNextStream() { return null; } + public void resetStream() { + if (initialized) { + resDirFilesNum = 0; + initialized = false; + } + } + /** * Little abbreviation for StringUtils. */ diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 2a6b944..7e33690 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1529,6 +1529,20 @@ public boolean getResults(ArrayList res) throws IOException, CommandNeed return true; } + public void resetFetch() { + if (plan != null && plan.getFetchTask() != null) { + try { + plan.getFetchTask().clearFetch(); + } catch (Exception e) { + LOG.warn("Error closing the current fetch task", e); + } + plan.getFetchTask().initialize(conf, plan, null); + } else { + ctx.resetStream(); + resStream = null; + } + } + public int getTryCount() { return tryCount; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java index df2ccf1..f9805f1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java @@ -76,6 +76,7 @@ public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx) { sink = work.getSink(); fetch = new FetchOperator(work, job, source, getVirtualColumns(source)); source.initialize(conf, new ObjectInspector[]{fetch.getOutputObjectInspector()}); + totalRows = 0; } catch (Exception e) { // Bail out ungracefully - we should never hit diff --git ql/src/java/org/apache/hadoop/hive/ql/processors/DfsProcessor.java ql/src/java/org/apache/hadoop/hive/ql/processors/DfsProcessor.java index ce54e0c..79dc6c6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/processors/DfsProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/processors/DfsProcessor.java @@ -24,6 +24,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FsShell; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hadoop.hive.ql.parse.VariableSubstitution; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; @@ -36,11 +38,19 @@ public static final Log LOG = LogFactory.getLog(DfsProcessor.class.getName()); public static final LogHelper console = new LogHelper(LOG); + public static final String DFS_RESULT_HEADER = "DFS Output"; private final FsShell dfs; + private final Schema dfsSchema; public DfsProcessor(Configuration conf) { + this(conf, false); + } + + public DfsProcessor(Configuration conf, boolean addSchema) { dfs = new FsShell(conf); + dfsSchema = new Schema(); + dfsSchema.addToFieldSchemas(new FieldSchema(DFS_RESULT_HEADER, "string", "")); } public void init() { @@ -66,7 +76,7 @@ public CommandProcessorResponse run(String command) { } System.setOut(oldOut); - return new CommandProcessorResponse(ret); + return new CommandProcessorResponse(ret,"", "", dfsSchema); } catch (Exception e) { console.printError("Exception raised from DFSShell.run " diff --git service/src/java/org/apache/hive/service/cli/operation/DfsOperation.java service/src/java/org/apache/hive/service/cli/operation/DfsOperation.java index a8b8ed4..b550f84 100644 --- service/src/java/org/apache/hive/service/cli/operation/DfsOperation.java +++ service/src/java/org/apache/hive/service/cli/operation/DfsOperation.java @@ -32,7 +32,7 @@ protected DfsOperation(HiveSession parentSession, String statement, Map confOverlay) { super(parentSession, statement, confOverlay); - setCommandProcessor(new DfsProcessor(parentSession.getHiveConf())); + setCommandProcessor(new DfsProcessor(parentSession.getHiveConf(), true)); } } diff --git service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java index 581e69c..eb40dbf 100644 --- service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java +++ service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java @@ -18,6 +18,8 @@ package org.apache.hive.service.cli.operation; +import java.util.EnumSet; + import org.apache.hive.service.cli.FetchOrientation; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.OperationState; @@ -63,6 +65,11 @@ public TableSchema getResultSetSchema() throws HiveSQLException { @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { assertState(OperationState.FINISHED); + validateFetchOrientation(orientation, + EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST)); + if (orientation.equals(FetchOrientation.FETCH_FIRST)) { + rowSet.setStartOffset(0); + } return rowSet.extractSubset((int)maxRows); } } diff --git service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java index af87a90..3424890 100644 --- service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java +++ service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java @@ -20,6 +20,7 @@ import java.sql.DatabaseMetaData; import java.util.Collections; +import java.util.EnumSet; import java.util.List; import java.util.regex.Pattern; @@ -192,6 +193,11 @@ public TableSchema getResultSetSchema() throws HiveSQLException { @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { assertState(OperationState.FINISHED); + validateFetchOrientation(orientation, + EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST)); + if (orientation.equals(FetchOrientation.FETCH_FIRST)) { + rowSet.setStartOffset(0); + } return rowSet.extractSubset((int)maxRows); } diff --git service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java index 0fe01c0..f1fc046 100644 --- service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java +++ service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java @@ -19,6 +19,7 @@ package org.apache.hive.service.cli.operation; import java.sql.DatabaseMetaData; +import java.util.EnumSet; import java.util.Set; import org.apache.hadoop.hive.ql.exec.FunctionInfo; @@ -115,6 +116,11 @@ public TableSchema getResultSetSchema() throws HiveSQLException { @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { assertState(OperationState.FINISHED); + validateFetchOrientation(orientation, + EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST)); + if (orientation.equals(FetchOrientation.FETCH_FIRST)) { + rowSet.setStartOffset(0); + } return rowSet.extractSubset((int)maxRows); } } diff --git service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java index bafe40c..ace51a0 100644 --- service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java +++ service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java @@ -18,6 +18,8 @@ package org.apache.hive.service.cli.operation; +import java.util.EnumSet; + import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hive.service.cli.FetchOrientation; import org.apache.hive.service.cli.HiveSQLException; @@ -84,6 +86,11 @@ public TableSchema getResultSetSchema() throws HiveSQLException { @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { assertState(OperationState.FINISHED); + validateFetchOrientation(orientation, + EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST)); + if (orientation.equals(FetchOrientation.FETCH_FIRST)) { + rowSet.setStartOffset(0); + } return rowSet.extractSubset((int)maxRows); } } diff --git service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java index eaf867e..a18d4df 100644 --- service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java +++ service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java @@ -18,6 +18,8 @@ package org.apache.hive.service.cli.operation; +import java.util.EnumSet; + import org.apache.hadoop.hive.metastore.TableType; import org.apache.hive.service.cli.FetchOrientation; import org.apache.hive.service.cli.HiveSQLException; @@ -75,6 +77,11 @@ public TableSchema getResultSetSchema() throws HiveSQLException { @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { assertState(OperationState.FINISHED); + validateFetchOrientation(orientation, + EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST)); + if (orientation.equals(FetchOrientation.FETCH_FIRST)) { + rowSet.setStartOffset(0); + } return rowSet.extractSubset((int)maxRows); } diff --git service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java index d9d0e9c..440f0e8 100644 --- service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java +++ service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java @@ -19,6 +19,7 @@ package org.apache.hive.service.cli.operation; import java.util.ArrayList; +import java.util.EnumSet; import java.util.List; import org.apache.hadoop.hive.metastore.IMetaStoreClient; @@ -110,6 +111,11 @@ public TableSchema getResultSetSchema() throws HiveSQLException { @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { assertState(OperationState.FINISHED); + validateFetchOrientation(orientation, + EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST)); + if (orientation.equals(FetchOrientation.FETCH_FIRST)) { + rowSet.setStartOffset(0); + } return rowSet.extractSubset((int)maxRows); } } diff --git service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java index 2daa9cd..62dc78f 100644 --- service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java +++ service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java @@ -18,6 +18,8 @@ package org.apache.hive.service.cli.operation; +import java.util.EnumSet; + import org.apache.hive.service.cli.FetchOrientation; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.OperationState; @@ -130,6 +132,11 @@ public TableSchema getResultSetSchema() throws HiveSQLException { @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { assertState(OperationState.FINISHED); + validateFetchOrientation(orientation, + EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST)); + if (orientation.equals(FetchOrientation.FETCH_FIRST)) { + rowSet.setStartOffset(0); + } return rowSet.extractSubset((int)maxRows); } } diff --git service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java index 0a8825e..8524e5e 100644 --- service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java +++ service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java @@ -27,6 +27,7 @@ import java.io.PrintStream; import java.io.UnsupportedEncodingException; import java.util.ArrayList; +import java.util.EnumSet; import java.util.List; import java.util.Map; @@ -146,6 +147,11 @@ public TableSchema getResultSetSchema() throws HiveSQLException { */ @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { + validateFetchOrientation(orientation, + EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST)); + if (orientation.equals(FetchOrientation.FETCH_FIRST)) { + resetResultReader(); + } List rows = readResults((int) maxRows); RowSet rowSet = new RowSet(); @@ -172,7 +178,6 @@ public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws H throw new HiveSQLException(e); } } - List results = new ArrayList(); for (int i = 0; i < nLines || nLines <= 0; ++i) { @@ -193,11 +198,15 @@ public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws H } private void cleanTmpFile() { + resetResultReader(); + SessionState sessionState = getParentSession().getSessionState(); + File tmp = sessionState.getTmpOutputFile(); + tmp.delete(); + } + + private void resetResultReader() { if (resultReader != null) { - SessionState sessionState = getParentSession().getSessionState(); - File tmp = sessionState.getTmpOutputFile(); IOUtils.cleanup(LOG, resultReader); - tmp.delete(); resultReader = null; } } diff --git service/src/java/org/apache/hive/service/cli/operation/Operation.java service/src/java/org/apache/hive/service/cli/operation/Operation.java index 6f4b8dc..9d9eca7 100644 --- service/src/java/org/apache/hive/service/cli/operation/Operation.java +++ service/src/java/org/apache/hive/service/cli/operation/Operation.java @@ -17,6 +17,8 @@ */ package org.apache.hive.service.cli.operation; +import java.util.EnumSet; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -124,4 +126,21 @@ public void cancel() throws HiveSQLException { public RowSet getNextRowSet() throws HiveSQLException { return getNextRowSet(FetchOrientation.FETCH_NEXT, DEFAULT_FETCH_MAX_ROWS); } + + /** + * Verify if the given fetch orientation is part of the supported orientation types. + * @param orientation + * @param supportedOrientations + * @throws HiveSQLException + */ + public void validateFetchOrientation(FetchOrientation orientation, + EnumSet supportedOrientations) throws HiveSQLException { + for (FetchOrientation supportedOrientation : supportedOrientations) { + if (supportedOrientation.equals(orientation)) { + return; // found the match + } + } + throw new HiveSQLException("The fetch type " + orientation.toString() + + " is not supported for this resultset", "HY106"); + } } diff --git service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 976a1ef..58e5cff 100644 --- service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.sql.SQLException; import java.util.ArrayList; +import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Properties; @@ -61,6 +62,7 @@ private TableSchema resultSchema = null; private Schema mResultSchema = null; private SerDe serde = null; + private boolean fetchStarted = false; public SQLOperation(HiveSession parentSession, String statement, Map confOverlay) { @@ -156,10 +158,19 @@ public TableSchema getResultSetSchema() throws HiveSQLException { @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { assertState(OperationState.FINISHED); + validateFetchOrientation(orientation, + EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST)); ArrayList rows = new ArrayList(); driver.setMaxRows((int)maxRows); try { + /* if client is requesting fetch-from-start and its not the first time reading from this operation + * then reset the fetch position to beginging + */ + if (orientation.equals(FetchOrientation.FETCH_FIRST) && fetchStarted) { + driver.resetFetch(); + } + fetchStarted = true; driver.getResults(rows); getSerDe();