diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java index 02d4360091..43f3b59b59 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java @@ -81,6 +81,8 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.junit.rules.TestName; +import static java.sql.ResultSet.CONCUR_READ_ONLY; +import static java.sql.ResultSet.TYPE_SCROLL_INSENSITIVE; import static org.apache.hadoop.hive.conf.SystemVariables.SET_COLUMN_NAME; import static org.apache.hadoop.hive.ql.exec.ExplainTask.EXPL_COLUMN_NAME; import static org.junit.Assert.assertEquals; @@ -3210,6 +3212,39 @@ public void testGetQueryId() throws Exception { stmt1.close(); } + @Test + public void testResultNextAcidTable() throws Exception { + Statement stmt = con.createStatement(TYPE_SCROLL_INSENSITIVE, CONCUR_READ_ONLY); + try { + stmt.execute("set " + ConfVars.HIVE_SUPPORT_CONCURRENCY.varname + "=true"); + stmt.execute("set " + ConfVars.HIVE_TXN_MANAGER.varname + + "=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); + stmt.execute("create table tbl (fld int) tblproperties(" + + "'transactional'='true','transactional_properties'='insert_only')"); + stmt.execute("insert into tbl values (1)"); + stmt.execute("insert into tbl values (2)"); + stmt.execute("insert into tbl values (3)"); + stmt.setMaxRows(1); + stmt.setFetchSize(1); + ResultSet res = stmt.executeQuery("select * from tbl"); + boolean moreRow = res.next(); + while (moreRow) { + moreRow = res.next(); + } + res.beforeFirst(); + moreRow = res.next(); + while (moreRow) { + moreRow = res.next(); + } + stmt.execute("drop table tbl"); + } finally { + stmt.execute("set " + ConfVars.HIVE_SUPPORT_CONCURRENCY.varname + "=false"); + stmt.execute("set " + ConfVars.HIVE_TXN_MANAGER.varname + + "=org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); + stmt.close(); + } + } + // Test that opening a JDBC connection to a non-existent database throws a HiveSQLException @Test(expected = HiveSQLException.class) public void testConnectInvalidDatabase() throws SQLException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java index caa9d83b4f..93b115841b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java @@ -51,6 +51,7 @@ private ListSinkOperator sink; private int totalRows; private static transient final Logger LOG = LoggerFactory.getLogger(FetchTask.class); + JobConf job = null; public FetchTask() { super(); @@ -68,7 +69,11 @@ public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext try { // Create a file system handle - JobConf job = new JobConf(conf); + if (job == null) { + // The job config should be initilaized once per fetch task. In case of refetch, we should use the + // same config. + job = new JobConf(conf); + } Operator source = work.getSource(); if (source instanceof TableScanOperator) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 4cd60cc6ee..9e3c3ca6f3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -611,6 +611,7 @@ private String getValidWriteIdList(String dbName, String tblName, String validTx // Get list of valid transactions for Repl Dump. Also wait for a given amount of time for the // open transactions to finish. Abort any open transactions after the wait is over. String getValidTxnListForReplDump(Hive hiveDb, long waitUntilTime) throws HiveException { + // Key design point for REPL DUMP is to not have any txns older than current txn in which // dump runs. This is needed to ensure that Repl dump doesn't copy any data files written by // any open txns mainly for streaming ingest case where one delta file shall have data from @@ -620,6 +621,7 @@ String getValidTxnListForReplDump(Hive hiveDb, long waitUntilTime) throws HiveEx // of time to see if all open txns < current txn is getting aborted/committed. If not, then // we forcefully abort those txns just like AcidHouseKeeperService. ValidTxnList validTxnList = getTxnMgr().getValidTxns(); + LOG.info("REPL DUMP getValidTxnListForReplDump", validTxnList.writeToString()); while (System.currentTimeMillis() < waitUntilTime) { // If there are no txns which are open for the given ValidTxnList snapshot, then just return it. if (getOpenTxns(validTxnList).isEmpty()) { diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java index 54289afc97..5cc7170a5c 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java @@ -24,6 +24,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileChecksum; @@ -125,10 +126,25 @@ public Path getEffectivePath() { } } + static void test() throws Exception { + throw new Exception(""); + } + public static synchronized ReplChangeManager getInstance(Configuration conf) throws MetaException { if (instance == null) { + try { + test(); + } catch (Exception e) { + LOG.info("Called getInstance created" + ExceptionUtils.getStackTrace(e)); + } instance = new ReplChangeManager(conf); + } else { + try { + test(); + } catch (Exception e) { + LOG.info("Called getInstance already created" + ExceptionUtils.getStackTrace(e)); + } } return instance; }