diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index dedfe3f..7da5365 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -790,6 +790,8 @@ public void testTransactionBatchCommit_Json() throws Exception { , txnBatch.getCurrentTransactionState()); connection.close(); + List rs = queryTable(driver, "select * from " + dbName + "." + tblName); + Assert.assertEquals(1, rs.size()); } @Test diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 5fe3525..cd313b8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -904,6 +904,14 @@ private void recordValidTxns() throws LockException { ValidTxnList txns = txnMgr.getValidTxns(); String txnStr = txns.toString(); conf.set(ValidTxnList.VALID_TXNS_KEY, txnStr); + if(plan.getFetchTask() != null) { + /** + * This is needed for {@link HiveConf.ConfVars.HIVEFETCHTASKCONVERSION} optimization which + * initializes JobConf in FetchOperator before recordValidTxns() but this has to be done + * after locks are acquired to avoid race conditions in ACID. + */ + plan.getFetchTask().setValidTxnList(txnStr); + } LOG.debug("Encoding valid txns info " + txnStr + " txnid:" + txnMgr.getCurrentTxnId()); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index d8ac6ae..601ad08 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -145,6 +146,9 @@ public FetchOperator(FetchWork work, JobConf job, Operator operator, initialize(); } + public void setValidTxnList(String txnStr) { + job.set(ValidTxnList.VALID_TXNS_KEY, txnStr); + } private void initialize() throws HiveException { if (isStatReader) { outputOI = work.getStatRowOI(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java index dff1815..db6848a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java @@ -24,7 +24,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.DriverContext; @@ -59,6 +58,9 @@ public FetchTask() { super(); } + public void setValidTxnList(String txnStr) { + fetch.setValidTxnList(txnStr); + } @Override public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext ctx, CompilationOpContext opContext) { diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 245a3bc..3fec3f9 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -714,6 +714,17 @@ public void testValidTxnsBookkeeping() throws Exception { } @Test + public void testSimpleRead() throws Exception { + hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "more"); + int[][] tableData = {{1,2},{3,3}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(tableData)); + int[][] tableData2 = {{5,3}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(tableData2)); + hiveConf.set(ValidTxnList.VALID_TXNS_KEY, "0:"); + List rs = runStatementOnDriver("select * from " + Table.ACIDTBL); + Assert.assertEquals("Missing data", 3, rs.size()); + } + @Test public void testUpdateMixedCase() throws Exception { int[][] tableData = {{1,2},{3,3},{5,3}}; runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData));