diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index eed37a1937..52e67ad27b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -2209,7 +2209,12 @@ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType } // Note: this assumes both paths are qualified; which they are, currently. - if (isMmTable && loadPath.equals(tbl.getPath())) { + if ((isMmTable || isFullAcidTable) && loadPath.equals(tbl.getPath())) { + /** + * some operations on Transactional tables (e.g. Import) write directly to the final location + * and avoid the 'move' operation. Since MoveTask does other things, setting 'loadPath' to be + * the table/partition path indicates that the 'file move' part of MoveTask is not needed. + */ if (Utilities.FILE_OP_LOGGER.isDebugEnabled()) { Utilities.FILE_OP_LOGGER.debug( "not moving " + loadPath + " to " + tbl.getPath() + " (MM)"); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 8b639f7922..6bc0e7c45b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; @@ -246,6 +247,7 @@ public static boolean prepareImport(boolean isImportCmd, } catch (Exception e) { throw new HiveException(e); } + //todo: should this not be done after we decided if we are creating a new table vs using existing? boolean isSourceMm = AcidUtils.isInsertOnlyTable(tblDesc.getTblProps()); if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){ @@ -325,22 +327,17 @@ public static boolean prepareImport(boolean isImportCmd, } Long writeId = 0L; // Initialize with 0 for non-ACID and non-MM tables. + int stmtId = 0; if (((table != null) && AcidUtils.isTransactionalTable(table)) || AcidUtils.isTablePropertyTransactional(tblDesc.getTblProps())) { // Explain plan doesn't open a txn and hence no need to allocate write id. if (x.getCtx().getExplainConfig() == null) { - writeId = SessionState.get().getTxnMgr().getTableWriteId(tblDesc.getDatabaseName(), tblDesc.getTableName()); + HiveTxnManager tm = SessionState.get().getTxnMgr(); + writeId = tm.getTableWriteId(tblDesc.getDatabaseName(), tblDesc.getTableName()); + stmtId = tm.getStmtIdAndIncrement(); } } - int stmtId = 0; - // TODO [MM gap?]: bad merge; tblDesc is no longer CreateTableDesc, but ImportTableDesc. - // We need to verify the tests to see if this works correctly. - /* - if (isAcid(writeId)) { - tblDesc.setInitialMmWriteId(writeId); - } - */ if (!replicationSpec.isInReplicationScope()) { createRegularImportTasks( tblDesc, partitionDescs, @@ -382,17 +379,25 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, return tblDesc; } - private static Task loadTable(URI fromURI, Table table, boolean replace, Path tgtPath, ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x, - Long writeId, int stmtId, boolean isSourceMm) { + Long writeId, int stmtId) { + assert table != null; Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME); Path destPath = null, loadPath = null; LoadFileType lft; - if (AcidUtils.isInsertOnlyTable(table)) { + if (AcidUtils.isTransactionalTable(table)) { String mmSubdir = replace ? AcidUtils.baseDir(writeId) : AcidUtils.deltaSubdir(writeId, writeId, stmtId); destPath = new Path(tgtPath, mmSubdir); + /** + * CopyTask below will copy files from the 'archive' to a delta_x_x in the table/partition + * directory, i.e. the filnal destination for these files. This has to be a copy to preserve + * the archive. MoveTask is optimized to do a 'rename' if files are on the same FileSystem. + * So setting 'loadPath' this way will make + * {@link Hive#loadTable(Path, String, LoadFileType, boolean, boolean, boolean, boolean, Long, int)} + * skip the unnecessary file (rename) operation but it will perform other things. + */ loadPath = tgtPath; lft = LoadFileType.KEEP_EXISTING; } else { @@ -404,7 +409,11 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("adding import work for table with source location: " + dataPath + "; table: " + tgtPath + "; copy destination " + destPath + "; mm " + writeId + - " (src " + isSourceMm + ") for " + (table == null ? "a new table" : table.getTableName())); + " for " + table.getTableName() + ": " + + (AcidUtils.isFullAcidTable(table) ? "acid" : + (AcidUtils.isInsertOnlyTable(table) ? "mm" : "flat") + ) + ); } Task copyTask = null; @@ -412,13 +421,14 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf()); } else { CopyWork cw = new CopyWork(dataPath, destPath, false); - cw.setSkipSourceMmDirs(isSourceMm); + cw.setSkipSourceMmDirs(AcidUtils.isInsertOnlyTable(table)); copyTask = TaskFactory.get(cw); } LoadTableDesc loadTableWork = new LoadTableDesc( loadPath, Utilities.getTableDesc(table), new TreeMap<>(), lft, writeId); loadTableWork.setStmtId(stmtId); + //if Importing into existing table, FileFormat is ImportSemanticAnalzyer.checked checkTable() MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, null, false); Task loadTableTask = TaskFactory.get(mv, x.getConf()); copyTask.addDependentTask(loadTableTask); @@ -426,14 +436,6 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, return loadTableTask; } - /** - * todo: this is odd: write id allocated for all write operations on ACID tables. what is this supposed to check? - */ - @Deprecated - private static boolean isAcid(Long writeId) { - return (writeId != null) && (writeId != 0); - } - private static Task createTableTask(ImportTableDesc tableDesc, EximUtil.SemanticAnalyzerWrapperContext x){ return tableDesc.getCreateTableTask(x.getInputs(), x.getOutputs(), x.getConf()); } @@ -486,9 +488,9 @@ private static boolean isAcid(Long writeId) { + partSpecToString(partSpec.getPartSpec()) + " with source location: " + srcLocation); Path tgtLocation = new Path(partSpec.getLocation()); - Path destPath = !AcidUtils.isInsertOnlyTable(table.getParameters()) ? x.getCtx().getExternalTmpPath(tgtLocation) + Path destPath = !AcidUtils.isTransactionalTable(table.getParameters()) ? x.getCtx().getExternalTmpPath(tgtLocation) : new Path(tgtLocation, AcidUtils.deltaSubdir(writeId, writeId, stmtId)); - Path moveTaskSrc = !AcidUtils.isInsertOnlyTable(table.getParameters()) ? destPath : tgtLocation; + Path moveTaskSrc = !AcidUtils.isTransactionalTable(table.getParameters()) ? destPath : tgtLocation; if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("adding import work for partition with source location: " + srcLocation + "; target: " + tgtLocation + "; copy dest " + destPath + "; mm " @@ -591,7 +593,7 @@ public static String partSpecToString(Map partSpec) { return sb.toString(); } - public static void checkTable(Table table, ImportTableDesc tableDesc, + private static void checkTable(Table table, ImportTableDesc tableDesc, ReplicationSpec replicationSpec, HiveConf conf) throws SemanticException, URISyntaxException { // This method gets called only in the scope that a destination table already exists, so @@ -624,7 +626,7 @@ public static void checkTable(Table table, ImportTableDesc tableDesc, // table in the statement, if a destination partitioned table exists, so long as it is actually // not external itself. Is that the case? Why? { - if ( (tableDesc.isExternal()) // IMPORT statement speicified EXTERNAL + if ( (tableDesc.isExternal()) // IMPORT statement specified EXTERNAL && (!table.isPartitioned() || !table.getTableType().equals(TableType.EXTERNAL_TABLE)) ){ throw new SemanticException(ErrorMsg.INCOMPATIBLE_SCHEMA.getMsg( @@ -841,7 +843,7 @@ private static void createRegularImportTasks( Path tgtPath = new Path(table.getDataLocation().toString()); FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), x.getConf()); checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x.getLOG()); - loadTable(fromURI, table, false, tgtPath, replicationSpec, x, writeId, stmtId, isSourceMm); + loadTable(fromURI, table, false, tgtPath, replicationSpec, x, writeId, stmtId); } // Set this to read because we can't overwrite any existing partitions x.getOutputs().add(new WriteEntity(table, WriteEntity.WriteType.DDL_NO_LOCK)); @@ -885,7 +887,7 @@ private static void createRegularImportTasks( tblproperties.put("transactional_properties", "insert_only"); table.setParameters(tblproperties); } - t.addDependentTask(loadTable(fromURI, table, false, tablePath, replicationSpec, x, writeId, stmtId, isSourceMm)); + t.addDependentTask(loadTable(fromURI, table, false, tablePath, replicationSpec, x, writeId, stmtId)); } } x.getTasks().add(t); @@ -997,7 +999,7 @@ private static void createReplImportTasks( } } else { x.getLOG().debug("adding dependent CopyWork/MoveWork for table"); - t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()), replicationSpec, x, writeId, stmtId, isSourceMm)); + t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()), replicationSpec, x, writeId, stmtId)); } } // Simply create @@ -1059,7 +1061,7 @@ private static void createReplImportTasks( if (!replicationSpec.isMetadataOnly()) { // repl-imports are replace-into unless the event is insert-into loadTable(fromURI, table, replicationSpec.isReplace(), new Path(fromURI), - replicationSpec, x, writeId, stmtId, isSourceMm); + replicationSpec, x, writeId, stmtId); } else { x.getTasks().add(alterTableTask(tblDesc, x, replicationSpec)); } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java index 0fee075df2..799a3a02e7 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java @@ -472,4 +472,70 @@ public void testLoadAcidFile() throws Exception { Assert.assertEquals("Unexpected error code", ErrorMsg.LOAD_DATA_ACID_FILE.getErrorCode(), cpr.getErrorCode()); } + @Test + public void testImport() throws Exception { + testImport(false); + } + @Test + public void testImportVectorized() throws Exception { + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); + testImport(true); + } + + /** + * MM tables already work - mm_exim.q + * Export creates a bunch of metadata in addition to data including all table props/IF/OF etc + * Import from 'export' can create a table (any name specified) or add data into existing table. + * If importing into existing table (un-partitioned) it must be empty. + * If Import is creating a table it will be exactly like exported one except for the name. + * + * This tests importing into existing table - that is the only way to make target table Acid for + * now. See below. + * + * + * see for example, import_addpartition_blobstore_to_warehouse.q - Export of a partitioned table + * creates something that you can use to import individual partitions from. + * Also, Import can create target table apparently + * Given how export is implemented, Import can't create a transactional target table... Perhaps can Alter Table the 'temp' table to Acid after it's written + * In this case it will have flat table structure but correct tbl props + */ + private void testImport(boolean isVectorized) throws Exception { + MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID, true); + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists Tstage"); + runStatementOnDriver("create table T (a int, b int) stored as orc"); + //Tstage is just a simple way to generate test data + runStatementOnDriver("create table Tstage (a int, b int) stored as orc tblproperties('transactional'='false')"); + //this creates an ORC data file with correct schema under table root + runStatementOnDriver("insert into Tstage values(1,2),(3,4),(5,6)"); + runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'"); + //runStatementOnDriver("truncate table Tstage"); + + //todo: this supports optional LOCATION param - seems to set data location of new table - ignore for Acid? throw? + //load into existing empty table T + runStatementOnDriver("import table T from '" + getWarehouseDir() + "/1'"); + + String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" : + "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"; + String[][] expected = new String[][] { + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/000000_0"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/000000_0"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t5\t6", "t/delta_0000001_0000001_0000/000000_0"}}; + checkResult(expected, testQuery, isVectorized, "import existing table"); + + runStatementOnDriver("update T set a = 0 where b = 6"); + String[][] expected2 = new String[][] { + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/000000_0"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/000000_0"}, + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t0\t6", "t/delta_0000002_0000002_0000/bucket_00000"}}; + checkResult(expected2, testQuery, isVectorized, "update imported table"); + + runStatementOnDriver("alter table T compact 'minor'"); + TestTxnCommands2.runWorker(hiveConf); + String[][] expected3 = new String[][] { + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000002/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000002/bucket_00000"}, + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t0\t6", "t/delta_0000001_0000002/bucket_00000"}}; + checkResult(expected3, testQuery, isVectorized, "minor compact imported table"); + } }