diff --git ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 1faa50a86a..f3e40eb6dc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -457,6 +457,10 @@ HIVE_GROUPING_SETS_SIZE_LIMIT(10411, "Grouping sets size cannot be greater than 64"), REBUILD_NO_MATERIALIZED_VIEW(10412, "Rebuild command only valid for materialized views"), + LOAD_DATA_ACID_FILE(10413, + "\"{0}\" was created created by Acid write - it cannot be loaded into anther Acid table", + true), + //========================== 20000 range starts here ========================// diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 1828f0a531..a9ebc90b5b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -1653,6 +1653,9 @@ public static boolean isRawFormat(Path baseOrDeltaDir, FileSystem fs) throws IOE //directory is empty or doesn't have any that could have been produced by load data return false; } + return isRawFormatFile(dataFile, fs); + } + public static boolean isRawFormatFile(Path dataFile, FileSystem fs) throws IOException { try { Reader reader = OrcFile.createReader(dataFile, OrcFile.readerOptions(fs.getConf())); /* diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java index d5aace092e..e49089b91e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java @@ -150,7 +150,8 @@ private URI initializeFromURI(String fromPath, boolean isLocal) throws IOExcepti } try { - srcs = matchFilesOrDir(FileSystem.get(fromURI, conf), new Path(fromURI)); + FileSystem fileSystem = FileSystem.get(fromURI, conf); + srcs = matchFilesOrDir(fileSystem, new Path(fromURI)); if (srcs == null || srcs.length == 0) { throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ast, "No files matching path " + fromURI)); @@ -162,6 +163,7 @@ private URI initializeFromURI(String fromPath, boolean isLocal) throws IOExcepti "source contains directory: " + oneSrc.getPath().toString())); } } + validateAcidFiles(table, srcs, fileSystem); // Do another loop if table is bucketed List bucketCols = table.getBucketCols(); if (bucketCols != null && !bucketCols.isEmpty()) { @@ -198,12 +200,24 @@ private URI initializeFromURI(String fromPath, boolean isLocal) throws IOExcepti if (bucketArray[bucketId]) { throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg( "Multiple files for same bucket : " + bucketId - + ". Only 1 file per bucket allowed in single load command. To load multiple files for same bucket, use multiple statements for table " + + ". Only 1 file per bucket allowed in single load command. To load " + + "multiple files for same bucket, use multiple statements for table " + table.getFullyQualifiedName())); } bucketArray[bucketId] = true; } } + else { + /** + * for loading into un-bucketed acid table, files can be named arbitrarily but they will + * be renamed during load. + * {@link Hive#mvFile(HiveConf, FileSystem, Path, FileSystem, Path, boolean, boolean, + * boolean, int)} + * and + * {@link Hive#copyFiles(HiveConf, FileSystem, FileStatus[], FileSystem, Path, boolean, + * boolean, List, boolean)} + */ + } } catch (IOException e) { // Has to use full name to make sure it does not conflict with // org.apache.commons.lang.StringUtils @@ -213,6 +227,28 @@ private URI initializeFromURI(String fromPath, boolean isLocal) throws IOExcepti return Lists.newArrayList(srcs); } + /** + * Safety check to make sure a file take from one acid table is not added into another acid table + * since the ROW__IDs embedded as part a write to one table won't make sense in different + * table/cluster. + */ + private static void validateAcidFiles(Table table, FileStatus[] srcs, FileSystem fs) + throws SemanticException { + if(!AcidUtils.isFullAcidTable(table)) { + return; + } + try { + for (FileStatus oneSrc : srcs) { + if (!AcidUtils.MetaDataFile.isRawFormatFile(oneSrc.getPath(), fs)) { + throw new SemanticException(ErrorMsg.LOAD_DATA_ACID_FILE, oneSrc.getPath().toString()); + } + } + } + catch(IOException ex) { + throw new SemanticException(ex); + } + } + @Override public void analyzeInternal(ASTNode ast) throws SemanticException { boolean isLocal = false; diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java index 8a01de37fc..3710311f80 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java @@ -19,6 +19,7 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.junit.Assert; import org.junit.Rule; @@ -453,9 +454,30 @@ public void testAbort() throws Exception { * which will currently make the query non-vectorizable. This means we can't check the file name * for vectorized version of the test. */ - private void checkResult(String[][] expectedResult, String query, boolean isVectorized, String msg) throws Exception{ + private void checkResult(String[][] expectedResult, String query, boolean isVectorized, + String msg) throws Exception{ List rs = runStatementOnDriver(query); checkExpected(rs, expectedResult, msg + (isVectorized ? " vect" : ""), LOG, !isVectorized); assertVectorized(isVectorized, query); } + @Test + public void testLoadAcidFile() throws Exception { + MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID, true); + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists T2"); + runStatementOnDriver( + "create table T (a int, b int) stored as orc"); + //This is just a simple way to generate test data + runStatementOnDriver("create table T2(a int, b int) stored as orc"); + runStatementOnDriver("insert into T values(1,2)"); + List rs = runStatementOnDriver("select INPUT__FILE__NAME from T"); + Assert.assertEquals(1, rs.size()); + Assert.assertTrue("Unexpcted file name", rs.get(0) + .endsWith("t/delta_0000001_0000001_0000/bucket_00000")); + //T2 is an acid table so this should fail + CommandProcessorResponse cpr = runStatementOnDriverNegative( + "load data local inpath '" + rs.get(0) + "' into table T2"); + Assert.assertEquals("Unexpected error code", + ErrorMsg.LOAD_DATA_ACID_FILE.getErrorCode(), cpr.getErrorCode()); + } }