diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 8c33f6ac0d..3d4cb839a0 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -27,6 +27,11 @@ import java.util.List; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.curator.shaded.com.google.common.collect.Lists; @@ -41,6 +46,7 @@ import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.LockState; import org.apache.hadoop.hive.metastore.api.LockType; +import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; @@ -60,6 +66,7 @@ import org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.thrift.TException; import org.junit.Assert; import org.junit.Ignore; @@ -77,6 +84,7 @@ * Mostly uses bucketed tables */ public class TestTxnCommands extends TxnCommandsBaseForTests { + static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommands.class); private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator + TestTxnCommands.class.getCanonicalName() @@ -108,6 +116,7 @@ public void testInsertOverwrite() throws Exception { Assert.assertEquals("1", rs.get(0)); Assert.assertEquals("5", rs.get(1)); } + @Ignore("not needed but useful for testing") @Test public void testNonAcidInsert() throws Exception { @@ -230,6 +239,186 @@ public void testMmExim() throws Exception { msClient.close(); } + private static final class QueryRunnable implements Runnable { + private final CountDownLatch cdlIn, cdlOut; + private final String query; + private final HiveConf hiveConf; + + QueryRunnable(HiveConf hiveConf, String query, CountDownLatch cdlIn, CountDownLatch cdlOut) { + this.query = query; + this.cdlIn = cdlIn; + this.cdlOut = cdlOut; + this.hiveConf = new HiveConf(hiveConf); + } + + @Override + public void run() { + SessionState ss = SessionState.start(hiveConf); + try { + ss.applyAuthorizationPolicy(); + } catch (HiveException e) { + throw new RuntimeException(e); + } + QueryState qs = new QueryState.Builder().withHiveConf(hiveConf).nonIsolated().build(); + Driver d = new Driver(qs, null); + try { + LOG.info("Ready to run the query: " + query); + syncThreadStart(cdlIn, cdlOut); + try { + CommandProcessorResponse cpr = d.run(query); + if(cpr.getResponseCode() != 0) { + throw new RuntimeException(query + " failed: " + cpr); + } + d.getResults(new ArrayList()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } finally { + d.close(); + } + } + } + + + private static void syncThreadStart(final CountDownLatch cdlIn, final CountDownLatch cdlOut) { + cdlIn.countDown(); + try { + cdlOut.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Test + public void testParallelInsertStats() throws Exception { + final int TASK_COUNT = 4; + String tableName = "mm_table"; + List stats; + IMetaStoreClient msClient = prepareParallelTest(tableName, 0); + + String[] queries = new String[TASK_COUNT]; + for (int i = 0; i < queries.length; ++i) { + queries[i] = String.format("insert into %s (a) values (" + i + ")", tableName); + } + + runParallelQueries(queries); + + // Verify stats are either invalid, or valid and correct. + stats = getTxnTableStats(msClient, tableName); + boolean hasStats = 0 != stats.size(); + if (hasStats) { + verifyLongStats(TASK_COUNT, 0, TASK_COUNT - 1, stats); + } + + runStatementOnDriver(String.format("insert into %s (a) values (" + TASK_COUNT + ")", tableName)); + if (!hasStats) { + // Stats should still be invalid if they were invalid. + stats = getTxnTableStats(msClient, tableName); + Assert.assertEquals(0, stats.size()); + } + + // Stats should be valid after analyze. + runStatementOnDriver(String.format("analyze table %s compute statistics for columns", tableName)); + verifyLongStats(TASK_COUNT + 1, 0, TASK_COUNT, getTxnTableStats(msClient, tableName)); + } + + private void verifyLongStats(int dvCount, int min, int max, List stats) { + Assert.assertEquals(1, stats.size()); + LongColumnStatsData data = stats.get(0).getStatsData().getLongStats(); + Assert.assertEquals(min, data.getLowValue()); + Assert.assertEquals(max, data.getHighValue()); + Assert.assertEquals(dvCount, data.getNumDVs()); + } + + private void runParallelQueries(String[] queries) + throws InterruptedException, ExecutionException { + ExecutorService executor = Executors.newFixedThreadPool(queries.length); + final CountDownLatch cdlIn = new CountDownLatch(queries.length), cdlOut = new CountDownLatch(1); + Future[] tasks = new Future[queries.length]; + for (int i = 0; i < tasks.length; ++i) { + tasks[i] = executor.submit(new QueryRunnable(hiveConf, queries[i], cdlIn, cdlOut)); + } + cdlIn.await(); // Wait for all threads to be ready. + cdlOut.countDown(); // Release them at the same time. + for (int i = 0; i < tasks.length; ++i) { + tasks[i].get(); + } + } + + private IMetaStoreClient prepareParallelTest(String tableName, int val) + throws Exception, MetaException, TException, NoSuchObjectException { + hiveConf.setBoolean("hive.stats.autogather", true); + hiveConf.setBoolean("hive.stats.column.autogather", true); + runStatementOnDriver("drop table if exists " + tableName); + runStatementOnDriver(String.format("create table %s (a int) stored as orc " + + "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')", + tableName)); + runStatementOnDriver(String.format("insert into %s (a) values (" + val + ")", tableName)); + runStatementOnDriver(String.format("insert into %s (a) values (" + val + ")", tableName)); + IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf); + // Stats should be valid after serial inserts. + List stats = getTxnTableStats(msClient, tableName); + Assert.assertEquals(1, stats.size()); + return msClient; + } + + + @Test + public void testParallelInsertAnalyzeStats() throws Exception { + String tableName = "mm_table"; + List stats; + IMetaStoreClient msClient = prepareParallelTest(tableName, 0); + + String[] queries = { + String.format("insert into %s (a) values (999)", tableName), + String.format("analyze table %s compute statistics for columns", tableName) + }; + runParallelQueries(queries); + + // Verify stats are either invalid, or valid and correct. + stats = getTxnTableStats(msClient, tableName); + boolean hasStats = 0 != stats.size(); + if (hasStats) { + verifyLongStats(2, 0, 999, stats); + } + + runStatementOnDriver(String.format("insert into %s (a) values (1000)", tableName)); + if (!hasStats) { + // Stats should still be invalid if they were invalid. + stats = getTxnTableStats(msClient, tableName); + Assert.assertEquals(0, stats.size()); + } + + // Stats should be valid after analyze. + runStatementOnDriver(String.format("analyze table %s compute statistics for columns", tableName)); + verifyLongStats(3, 0, 1000, getTxnTableStats(msClient, tableName)); + } + + // TODO## this test is broken; would probably be fixed by HIVE-20046 + @Test + public void testParallelTruncateAnalyzeStats() throws Exception { + String tableName = "mm_table"; + List stats; + IMetaStoreClient msClient = prepareParallelTest(tableName, 0); + + String[] queries = { + String.format("truncate table %s", tableName), + String.format("analyze table %s compute statistics for columns", tableName) + }; + runParallelQueries(queries); + + // Verify stats are either invalid, or valid and correct. + stats = getTxnTableStats(msClient, tableName); + boolean hasStats = 0 != stats.size(); + if (hasStats) { + verifyLongStats(0, 0, 0, stats); + } + + // Stats should be valid after analyze. + runStatementOnDriver(String.format("analyze table %s compute statistics for columns", tableName)); + verifyLongStats(0, 0, 0, getTxnTableStats(msClient, tableName)); + } + @Test public void testTxnStatsOnOff() throws Exception {