diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 004aaf7..7e1b8fa 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -730,6 +730,9 @@ protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) Utilities.copyTableJobPropertiesToConf(conf.getTableInfo(), jc); // only create bucket files only if no dynamic partitions, // buckets of dynamic partitions will be created for each newly created partition + //todo IOW integration. Full Acid uses the else if block to create Acid's RecordUpdater (HiveFileFormatUtils) + // and that will set writingBase(conf.getInsertOverwrite()) + // If MM wants to create a new base for IOW (instead of delta dir), it should specify it here if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || conf.isMmTable()) { Path outPath = fsp.outPaths[filesIdx]; if (conf.isMmTable() diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 71d01ed..61e0d9b 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; +import org.apache.hive.common.util.Ref; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configurable; @@ -497,40 +498,37 @@ private static void processForWriteIds(Path dir, JobConf conf, finalPaths.add(dir); return; } - FileStatus[] files = fs.listStatus(dir); // TODO: batch? + + // Tez require the use of recursive input dirs for union processing, so we have to look into the + // directory to find out LinkedList subdirs = new LinkedList<>(); - for (FileStatus file : files) { - handleNonMmDirChild(file, validTxnList, subdirs, finalPaths); - } + subdirs.add(dir); // add itself as a starting point while (!subdirs.isEmpty()) { - Path subdir = subdirs.poll(); - for (FileStatus file : fs.listStatus(subdir)) { - handleNonMmDirChild(file, validTxnList, subdirs, finalPaths); + Path currDir = subdirs.poll(); + FileStatus[] files = fs.listStatus(currDir); + boolean hadAcidState = false; // whether getAcidState has been called for currDir + for (FileStatus file : files) { + Path path = file.getPath(); + Utilities.LOG14535.warn("Checking " + path + " for inputs"); + if (!file.isDirectory()) { + Utilities.LOG14535.warn("Ignoring a file not in MM directory " + path); + } else if (JavaUtils.extractTxnId(path) == null) { + subdirs.add(path); + } else { + if (!hadAcidState) { + AcidUtils.Directory dirInfo = AcidUtils.getAcidState(currDir, conf, validTxnList, Ref.from(false), true, null); + hadAcidState = true; + // todo for IOW, we also need to count in base dir, if any + for (AcidUtils.ParsedDelta delta : dirInfo.getCurrentDirectories()) { + Utilities.LOG14535.info("Adding input " + delta.getPath()); + finalPaths.add(delta.getPath()); + } + } + } } } } - private static void handleNonMmDirChild(FileStatus file, ValidTxnList validTxnList, - LinkedList subdirs, List finalPaths) { - Path path = file.getPath(); - Utilities.LOG14535.warn("Checking " + path + " for inputs"); - if (!file.isDirectory()) { - Utilities.LOG14535.warn("Ignoring a file not in MM directory " + path); - return; - } - Long txnId = JavaUtils.extractTxnId(path); - if (txnId == null) { - subdirs.add(path); - return; - } - if (!validTxnList.isTxnValid(txnId)) { - Utilities.LOG14535.warn("Ignoring an uncommitted directory " + path); - return; - } - Utilities.LOG14535.info("Adding input " + path); - finalPaths.add(path); - } - Path[] getInputPaths(JobConf job) throws IOException { Path[] dirs; if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index ae52d15..5ac15af 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -302,15 +302,8 @@ public static boolean prepareImport( tableExists = true; } - Long txnId = null; + Long txnId = SessionState.get().getTxnMgr().getCurrentTxnId(); int stmtId = 0; - if (table != null && MetaStoreUtils.isInsertOnlyTable(table.getParameters())) { - txnId = SessionState.get().getTxnMgr().getCurrentTxnId(); - } else if (table == null && isSourceMm) { - // We could import everything as is - directories and IDs, but that won't work with ACID - // txn ids in future. So, let's import everything into the new MM directory with ID == 0. - txnId = 0l; - } //todo due to the master merge, tblDesc is no longer CreateTableDesc, but ImportTableDesc /* if (txnId != null) { @@ -363,7 +356,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x, Long txnId, int stmtId, boolean isSourceMm) { Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME); - Path destPath = txnId == null ? x.getCtx().getExternalTmpPath(tgtPath) + Path destPath = !MetaStoreUtils.isInsertOnlyTable(table.getParameters()) ? x.getCtx().getExternalTmpPath(tgtPath) : new Path(tgtPath, AcidUtils.deltaSubdir(txnId, txnId, stmtId)); Utilities.LOG14535.info("adding import work for table with source location: " + dataPath + "; table: " + tgtPath + "; copy destination " + destPath + "; mm " @@ -453,9 +446,9 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, + partSpecToString(partSpec.getPartSpec()) + " with source location: " + srcLocation); Path tgtLocation = new Path(partSpec.getLocation()); - Path destPath = txnId == null ? x.getCtx().getExternalTmpPath(tgtLocation) + Path destPath = !MetaStoreUtils.isInsertOnlyTable(table.getParameters()) ? x.getCtx().getExternalTmpPath(tgtLocation) : new Path(tgtLocation, AcidUtils.deltaSubdir(txnId, txnId, stmtId)); - Path moveTaskSrc = txnId == null ? destPath : tgtLocation; + Path moveTaskSrc = !MetaStoreUtils.isInsertOnlyTable(table.getParameters()) ? destPath : tgtLocation; Utilities.LOG14535.info("adding import work for partition with source location: " + srcLocation + "; target: " + tgtLocation + "; copy dest " + destPath + "; mm " + txnId + " (src " + isSourceMm + ") for " + partSpecToString(partSpec.getPartSpec())); @@ -788,7 +781,8 @@ private static void createRegularImportTasks( if (table.isPartitioned()) { x.getLOG().debug("table partitioned"); Task ict = createImportCommitTask( - table.getDbName(), table.getTableName(), txnId, stmtId, x.getConf()); + table.getDbName(), table.getTableName(), txnId, stmtId, x.getConf(), + MetaStoreUtils.isInsertOnlyTable(table.getParameters())); for (AddPartitionDesc addPartitionDesc : partitionDescs) { Map partSpec = addPartitionDesc.getPartition(0).getPartSpec(); @@ -824,7 +818,8 @@ private static void createRegularImportTasks( if (isPartitioned(tblDesc)) { Task ict = createImportCommitTask( - tblDesc.getDatabaseName(), tblDesc.getTableName(), txnId, stmtId, x.getConf()); + tblDesc.getDatabaseName(), tblDesc.getTableName(), txnId, stmtId, x.getConf(), + MetaStoreUtils.isInsertOnlyTable(tblDesc.getTblProps())); for (AddPartitionDesc addPartitionDesc : partitionDescs) { t.addDependentTask(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, txnId, stmtId, isSourceMm, ict)); @@ -858,9 +853,9 @@ private static void createRegularImportTasks( } private static Task createImportCommitTask( - String dbName, String tblName, Long txnId, int stmtId, HiveConf conf) { + String dbName, String tblName, Long txnId, int stmtId, HiveConf conf, boolean isMmTable) { @SuppressWarnings("unchecked") - Task ict = (txnId == null) ? null : TaskFactory.get( + Task ict = (!isMmTable) ? null : TaskFactory.get( new ImportCommitWork(dbName, tblName, txnId, stmtId), conf); return ict; } @@ -942,7 +937,8 @@ private static void createReplImportTasks( if (!replicationSpec.isMetadataOnly()) { if (isPartitioned(tblDesc)) { Task ict = createImportCommitTask( - tblDesc.getDatabaseName(), tblDesc.getTableName(), txnId, stmtId, x.getConf()); + tblDesc.getDatabaseName(), tblDesc.getTableName(), txnId, stmtId, x.getConf(), + MetaStoreUtils.isInsertOnlyTable(tblDesc.getTblProps())); for (AddPartitionDesc addPartitionDesc : partitionDescs) { addPartitionDesc.setReplicationSpec(replicationSpec); t.addDependentTask( @@ -970,7 +966,8 @@ private static void createReplImportTasks( Map partSpec = addPartitionDesc.getPartition(0).getPartSpec(); org.apache.hadoop.hive.ql.metadata.Partition ptn = null; Task ict = replicationSpec.isMetadataOnly() ? null : createImportCommitTask( - tblDesc.getDatabaseName(), tblDesc.getTableName(), txnId, stmtId, x.getConf()); + tblDesc.getDatabaseName(), tblDesc.getTableName(), txnId, stmtId, x.getConf(), + MetaStoreUtils.isInsertOnlyTable(tblDesc.getTblProps())); if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) { if (!replicationSpec.isMetadataOnly()){ x.getTasks().add(addSinglePartition( diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index a26047d..955e0e5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -202,8 +202,11 @@ public Path getFinalDirName() { /** getFinalDirName that takes into account MM, but not DP, LB or buckets. */ public Path getMergeInputDirName() { Path root = getFinalDirName(); - if (mmWriteId == null) return root; - return new Path(root, AcidUtils.deltaSubdir(txnId, txnId, 0)); + if (isMmTable()) { + return new Path(root, AcidUtils.deltaSubdir(txnId, txnId, 0)); + } else { + return root; + } } @Explain(displayName = "table", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })