diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index dc31505a44..f41ac85649 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -531,7 +531,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "The smaller it is the more load there will be on the jobtracker, the higher it is the less granular the caught will be."), DYNAMICPARTITIONING("hive.exec.dynamic.partition", true, "Whether or not to allow dynamic partitions in DML/DDL."), - DYNAMICPARTITIONINGMODE("hive.exec.dynamic.partition.mode", "strict", + DYNAMICPARTITIONINGMODE("hive.exec.dynamic.partition.mode", "nostrict", "In strict mode, the user must specify at least one static partition\n" + "in case the user accidentally overwrites all partitions.\n" + "In nonstrict mode all partitions are allowed to be dynamic."), @@ -1845,7 +1845,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "filter operators."), // Concurrency - HIVE_SUPPORT_CONCURRENCY("hive.support.concurrency", false, + HIVE_SUPPORT_CONCURRENCY("hive.support.concurrency", true, "Whether Hive supports concurrency control or not. \n" + "A ZooKeeper instance must be up and running when using zookeeper Hive lock manager "), HIVE_LOCK_MANAGER("hive.lock.manager", "org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager", ""), @@ -1894,7 +1894,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal // Transactions HIVE_TXN_MANAGER("hive.txn.manager", - "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager", + "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager", "Set to org.apache.hadoop.hive.ql.lockmgr.DbTxnManager as part of turning on Hive\n" + "transactions, which also requires appropriate settings for hive.compactor.initiator.on,\n" + "hive.compactor.worker.threads, hive.support.concurrency (true),\n" + diff --git hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/AbstractHCatLoaderTest.java hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/AbstractHCatLoaderTest.java index 59d2efb156..031708db0e 100644 --- hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/AbstractHCatLoaderTest.java +++ hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/AbstractHCatLoaderTest.java @@ -107,7 +107,7 @@ private void createTable(String tablename, String schema, String partitionedBy) static void createTable(String tablename, String schema, String partitionedBy, Driver driver, String storageFormat) throws IOException, CommandNeedRetryException { String createTable; - createTable = "create table " + tablename + "(" + schema + ") "; + createTable = "create external table " + tablename + "(" + schema + ") "; if ((partitionedBy != null) && (!partitionedBy.trim().isEmpty())) { createTable = createTable + "partitioned by (" + partitionedBy + ") "; } diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index d3df015288..dce4cec051 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1303,6 +1303,9 @@ public void releaseLocksAndCommitOrRollback(boolean commit, HiveTxnManager txnMa // If we've opened a transaction we need to commit or rollback rather than explicitly // releasing the locks. conf.unset(ValidTxnList.VALID_TXNS_KEY); + if(!checkConcurrency()) { + return; + } if (txnMgr.isTxnOpen()) { if (commit) { if(conf.getBoolVar(ConfVars.HIVE_IN_TEST) && conf.getBoolVar(ConfVars.HIVETESTMODEROLLBACKTXN)) { diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index fdb3603338..b605f75d3a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -186,7 +186,9 @@ public void run() { void setHiveConf(HiveConf conf) { super.setHiveConf(conf); if (!conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY)) { - throw new RuntimeException(ErrorMsg.DBTXNMGR_REQUIRES_CONCURRENCY.getMsg()); + //todo: hack for now - many (esp hcat) tests explicitly set concurrency to false so then + //since DbTxnManager is now default, this throws... + //throw new RuntimeException(ErrorMsg.DBTXNMGR_REQUIRES_CONCURRENCY.getMsg()); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 50bdce89a4..b2c671cdaa 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -3376,16 +3376,30 @@ private static Path mvFile(HiveConf conf, FileSystem sourceFs, Path sourcePath, final String fullname = sourcePath.getName(); final String name = FilenameUtils.getBaseName(sourcePath.getName()); final String type = FilenameUtils.getExtension(sourcePath.getName()); + final boolean isAcidTarget = destDirPath.getName().startsWith(AcidUtils.BASE_PREFIX) || destDirPath.getName().startsWith(AcidUtils.DELTA_PREFIX); + Path destFilePath; + if(isAcidTarget && !AcidUtils.originalBucketFilter.accept(sourcePath)) { + //if here we are doing a load Data into acid table - todo: make this more explicit + //Acid tables can only deal with files matching AcidUtils.originalBucketFilter. + //so here we rename the input file and further logic will add a copy_N suffix in case of + //collisions. (This works since Load Data doesn't support bucketed tables for now) + destFilePath = new Path(destDirPath, "000000_0"); + } + else { + destFilePath = new Path(destDirPath, fullname); + } - Path destFilePath = new Path(destDirPath, fullname); - - /* + /* * The below loop may perform bad when the destination file already exists and it has too many _copy_ * files as well. A desired approach was to call listFiles() and get a complete list of files from * the destination, and check whether the file exists or not on that list. However, millions of files * could live on the destination directory, and on concurrent situations, this can cause OOM problems. * * I'll leave the below loop for now until a better approach is found. + * + * This is problematic: caller of mvFile() may use a thread pool to move files in parallel in + * which case there is a race condition between exists() and reaname() from different threads. + * I suppose in case of collisions the FileSystem will throw and the command will fail. */ for (int counter = 1; destFs.exists(destFilePath); counter++) { if (isOverwrite) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java index cc956da575..8b5c5e4540 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java @@ -160,7 +160,7 @@ private URI initializeFromURI(String fromPath, boolean isLocal) throws IOExcepti "source contains directory: " + oneSrc.getPath().toString())); } if(AcidUtils.isFullAcidTable(table)) { - if(!AcidUtils.originalBucketFilter.accept(oneSrc.getPath())) { + if(false && !AcidUtils.originalBucketFilter.accept(oneSrc.getPath())) { //acid files (e.g. bucket_0000) have ROW_ID embedded in them and so can't be simply //copied to a table so only allow non-acid files for now throw new SemanticException(ErrorMsg.ACID_LOAD_DATA_INVALID_FILE_NAME, diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java index b98c74a889..823acc846e 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java @@ -18,25 +18,16 @@ package org.apache.hadoop.hive.ql; import org.apache.commons.io.FileUtils; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; -import org.apache.hadoop.io.NullWritable; import org.junit.Assert; -import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; -import java.io.IOException; import java.util.List; /** @@ -101,7 +92,7 @@ private void loadDataUpdate(boolean isVectorized) throws Exception { runStatementOnDriver( "create table T (a int, b int) stored as orc tblproperties('transactional'='true')"); //Tstage is just a simple way to generate test data - runStatementOnDriver("create table Tstage (a int, b int) stored as orc"); + runStatementOnDriver("create table Tstage (a int, b int) stored as orc tblproperties('transactional'='false')"); runStatementOnDriver("insert into Tstage values(1,2),(3,4)"); //this creates an ORC data file with correct schema under table root runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'"); @@ -169,7 +160,7 @@ private void loadData(boolean isVectorized) throws Exception { runStatementOnDriver("create table T (a int, b int) stored as orc tblproperties('transactional'='true')"); runStatementOnDriver("insert into T values(0,2),(0,4)"); //Tstage is just a simple way to generate test data - runStatementOnDriver("create table Tstage (a int, b int) stored as orc"); + runStatementOnDriver("create table Tstage (a int, b int) stored as orc tblproperties('transactional'='false')"); runStatementOnDriver("insert into Tstage values(1,2),(3,4)"); //this creates an ORC data file with correct schema under table root runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/1'"); @@ -239,10 +230,10 @@ private void loadData(boolean isVectorized) throws Exception { private void loadDataNonAcid2AcidConversion(boolean isVectorized) throws Exception { runStatementOnDriver("drop table if exists T"); runStatementOnDriver("drop table if exists Tstage"); - runStatementOnDriver("create table T (a int, b int) stored as orc"); + runStatementOnDriver("create table T (a int, b int) stored as orc tblproperties('transactional'='false')"); //per acid write to test nonAcid2acid conversion mixed with load data runStatementOnDriver("insert into T values(0,2),(0,4)"); - runStatementOnDriver("create table Tstage (a int, b int) stored as orc"); + runStatementOnDriver("create table Tstage (a int, b int) stored as orc tblproperties('transactional'='false')"); runStatementOnDriver("insert into Tstage values(1,2),(3,4)"); //make 2 more inserts so that we have 000000_0_copy_1, 000000_0_copy_2 files in export //export works at file level so if you have copy_N in the table dir, you'll have those in output @@ -311,7 +302,7 @@ public void loadDataPartitioned() throws Exception { runStatementOnDriver("drop table if exists T"); runStatementOnDriver("drop table if exists Tstage"); runStatementOnDriver("create table T (a int, b int) partitioned by (p int) stored as orc tblproperties('transactional'='true')"); - runStatementOnDriver("create table Tstage (a int, b int) stored as orc"); + runStatementOnDriver("create table Tstage (a int, b int) stored as orc tblproperties('transactional'='false')"); runStatementOnDriver("insert into Tstage values(0,2),(0,4)"); runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/1'"); @@ -365,7 +356,7 @@ public void testValidations() throws Exception { runStatementOnDriver("create table T (a int, b int) clustered by (a) into 2 buckets stored as orc tblproperties('transactional'='true')"); File createdFile= folder.newFile("myfile.txt"); FileUtils.writeStringToFile(createdFile, "hello world"); - runStatementOnDriver("create table Tstage (a int, b int) stored as orc"); + runStatementOnDriver("create table Tstage (a int, b int) stored as orc tblproperties('transactional'='false')"); //this creates an ORC data file with correct schema under table root runStatementOnDriver("insert into Tstage values(1,2),(3,4)"); CommandProcessorResponse cpr = runStatementOnDriverNegative("load data local inpath '" + getWarehouseDir() + "' into table T"); @@ -393,7 +384,7 @@ private void testMultiStatement(boolean isVectorized) throws Exception { runStatementOnDriver("drop table if exists Tstage"); runStatementOnDriver("create table T (a int, b int) stored as orc tblproperties('transactional'='true')"); //Tstage is just a simple way to generate test data - runStatementOnDriver("create table Tstage (a int, b int) stored as orc"); + runStatementOnDriver("create table Tstage (a int, b int) stored as orc tblproperties('transactional'='false')"); runStatementOnDriver("insert into Tstage values(5,5),(6,6)"); //this creates an ORC data file with correct schema under table root runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'"); @@ -434,7 +425,7 @@ public void testAbort() throws Exception { runStatementOnDriver("drop table if exists Tstage"); runStatementOnDriver("create table T (a int, b int) stored as orc tblproperties('transactional'='true')"); //Tstage is just a simple way to generate test data - runStatementOnDriver("create table Tstage (a int, b int) stored as orc"); + runStatementOnDriver("create table Tstage (a int, b int) stored as orc tblproperties('transactional'='false')"); runStatementOnDriver("insert into Tstage values(5,5),(6,6)"); //this creates an ORC data file with correct schema under table root runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'"); @@ -464,4 +455,26 @@ private void checkResult(String[][] expectedResult, String query, boolean isVect checkExpected(rs, expectedResult, msg + (isVectorized ? " vect" : ""), LOG, !isVectorized); assertVectorized(isVectorized, query); } + @Test + public void testAnyFileName() throws Exception { + boolean isVectorized = false; + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists Tstage"); + runStatementOnDriver("create table T ( ctinyint TINYINT,\n" + + " csmallint SMALLINT,\n" + + " cint INT,\n" + + " cbigint BIGINT,\n" + + " cfloat FLOAT,\n" + + " cdouble DOUBLE,\n" + + " cstring1 STRING,\n" + + " cstring2 STRING,\n" + + " ctimestamp1 TIMESTAMP,\n" + + " ctimestamp2 TIMESTAMP,\n" + + " cboolean1 BOOLEAN,\n" + + " cboolean2 BOOLEAN) stored as orc tblproperties('transactional'='true')"); + //ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnNoBuckets-1512791382683/warehouse + runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/../../../../../data/files/alltypesorc' into table T"); + List rs = runStatementOnDriver("select count(*) from T"); + Assert.assertEquals("12288", rs.get(0)); + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java index 780b97cb51..0bebb796b6 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java @@ -696,5 +696,20 @@ public void testCompactStatsGather() throws Exception { map = hms.getPartitionColumnStatistics("default","T", partNames, colNames); Assert.assertEquals("", 5, map.get(partNames.get(0)).get(0).getStatsData().getLongStats().getHighValue()); } + @Test + public void testDefault() throws Exception { + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("create table T (a int, b int) stored as orc"); + runStatementOnDriver("insert into T values(1,2),(3,4)"); + String query = "select ROW__ID, a, b, INPUT__FILE__NAME from T order by a, b"; + List rs = runStatementOnDriver(query); + String[][] expected = { + //this proves data is written in Acid layout so T was made Acid + {"{\"transactionid\":15,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000015_0000015_0000/bucket_00000"}, + {"{\"transactionid\":15,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000015_0000015_0000/bucket_00000"} + }; + checkExpected(rs, expected, "insert data"); + + } } diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java index da1031300a..e0b0b5530e 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.metastore; import java.io.IOException; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -34,6 +35,7 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.events.PreAlterTableEvent; import org.apache.hadoop.hive.metastore.events.PreCreateTableEvent; import org.apache.hadoop.hive.metastore.events.PreEventContext; @@ -185,6 +187,40 @@ private void handleAlterTableTransactionalProp(PreAlterTableEvent context) throw } /** + * Want to make a a newly create table Acid (unless it explicitly has transactional=true param) + * if table can support it. + */ + private void makeAcid(Table newTable) throws MetaException { + Configuration conf = MetastoreConf.newMetastoreConf(); + if("full".equalsIgnoreCase(MetastoreConf.getAsString(conf, MetastoreConf.ConfVars.ACID_DEFAULT))) { +// if(!MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.HIVE_SUPPORT_CONCURRENCY)) { +// //some tests explicitly set concurrency=false which then +// LOG.info("Could not make " + newTable.getDbName() + "." + newTable.getTableName() + " acid: concurrency=false explicitly"); +// return;//so just bail +// } + if(!conformToAcid(newTable)) { + LOG.info("Could not make " + newTable.getDbName() + "." + newTable.getTableName() + " acid: wrong IO format"); + return; + } + if(!TableType.MANAGED_TABLE.toString().equalsIgnoreCase(newTable.getTableType())) { + //todo should this check be in conformToAcid()? + LOG.info("Could not make " + newTable.getDbName() + "." + newTable.getTableName() + " acid: it's " + newTable.getTableType()); + return; + } + if(newTable.getSd().getSortColsSize() > 0) { + LOG.info("Could not make " + newTable.getDbName() + "." + newTable.getTableName() + " acid: it's sorted"); + return; + } + //check if orc and not sorted + Map parameters = newTable.getParameters(); + if (parameters == null || parameters.isEmpty()) { + parameters = new HashMap<>(); + } + parameters.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true"); + newTable.setParameters(parameters); + } + } + /** * Normalize case and make sure: * 1. 'true' is the only value to be set for 'transactional' (if set at all) * 2. If set to 'true', we should also enforce bucketing and ORC format @@ -193,6 +229,7 @@ private void handleCreateTableTransactionalProp(PreCreateTableEvent context) thr Table newTable = context.getTable(); Map parameters = newTable.getParameters(); if (parameters == null || parameters.isEmpty()) { + makeAcid(newTable); return; } String transactional = null; @@ -212,6 +249,7 @@ private void handleCreateTableTransactionalProp(PreCreateTableEvent context) thr } if (transactional == null) { + makeAcid(newTable); return; } diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index b46cc38a22..325318712a 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -215,6 +215,12 @@ public static ConfVars getMetaConf(String name) { public enum ConfVars { // alpha order, PLEASE! + ACID_DEFAULT("metastore.acid.default", "metastore.acid.default", "full", + new Validator.StringSet("none",//default not acid, i.e. leave the tbl as is + "all",//if possible make full acid, if not MM, if still not possible should be External tbl + "full",//if possible make full acid else do nothing + "mm"//if possible make table MM else should be External + ), "For testing. Causes the system to make suitable tables acid/mm automatically."), ADDED_JARS("metastore.added.jars.path", "hive.added.jars.path", "", "This an internal parameter."), AGGREGATE_STATS_CACHE_CLEAN_UNTIL("metastore.aggregate.stats.cache.clean.until", @@ -826,7 +832,7 @@ public static ConfVars getMetaConf(String name) { // The metastore shouldn't care what txn manager Hive is running, but in various tests it // needs to set these values. We should do the work to detangle this. HIVE_TXN_MANAGER("hive.txn.manager", "hive.txn.manager", - "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager", + "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager", "Set to org.apache.hadoop.hive.ql.lockmgr.DbTxnManager as part of turning on Hive\n" + "transactions, which also requires appropriate settings for hive.compactor.initiator.on,\n" + "hive.compactor.worker.threads, hive.support.concurrency (true),\n" + @@ -835,7 +841,7 @@ public static ConfVars getMetaConf(String name) { "no transactions."), // Metastore always support concurrency, but certain ACID tests depend on this being set. We // need to do the work to detangle this - HIVE_SUPPORT_CONCURRENCY("hive.support.concurrency", "hive.support.concurrency", false, + HIVE_SUPPORT_CONCURRENCY("hive.support.concurrency", "hive.support.concurrency", true, "Whether Hive supports concurrency control or not. \n" + "A ZooKeeper instance must be up and running when using zookeeper Hive lock manager "),