diff --git common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java index 4cbeb89..07b79a7 100644 --- common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java +++ common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java @@ -30,7 +30,10 @@ public class ValidWriteIds { public static final ValidWriteIds NO_WRITE_IDS = new ValidWriteIds(-1, -1, false, null); - public static final String MM_PREFIX = "mm"; + public static final String DELTA_PREFIX = "delta"; + public static final String DELTA_DIGITS = "%07d"; + public static final int DELTA_DIGITS_LEN = 7; + public static final String STATEMENT_DIGITS = "%04d"; private static final String CURRENT_SUFFIX = ".current"; private final static Logger LOG = LoggerFactory.getLogger(ValidWriteIds.class); @@ -151,19 +154,15 @@ public boolean isValid(long writeId) { return ids != null && (areIdsValid == ids.contains(writeId)); } - public static String getMmFilePrefix(long mmWriteId) { - return MM_PREFIX + "_" + mmWriteId; - } - - public static class IdPathFilter implements PathFilter { private final String mmDirName; private final boolean isMatch, isIgnoreTemp; - public IdPathFilter(long writeId, boolean isMatch) { - this(writeId, isMatch, false); + public IdPathFilter(long txnId, int stmtId, boolean isMatch) { + this(txnId, stmtId, isMatch, false); } - public IdPathFilter(long writeId, boolean isMatch, boolean isIgnoreTemp) { - this.mmDirName = ValidWriteIds.getMmFilePrefix(writeId); + public IdPathFilter(long txnId, int stmtId, boolean isMatch, boolean isIgnoreTemp) { + this.mmDirName = DELTA_PREFIX + "_" + String.format(DELTA_DIGITS, txnId) + "_" + + String.format(DELTA_DIGITS, txnId) + "_" + String.format(STATEMENT_DIGITS, stmtId); this.isMatch = isMatch; this.isIgnoreTemp = isIgnoreTemp; } @@ -186,8 +185,8 @@ public boolean accept(Path path) { @Override public boolean accept(Path path) { String name = path.getName(); - if (!name.startsWith(MM_PREFIX + "_")) return false; - String idStr = name.substring(MM_PREFIX.length() + 1); + if (!name.startsWith(DELTA_PREFIX + "_")) return false; + String idStr = name.substring(DELTA_PREFIX.length() + 1, DELTA_PREFIX.length() + 1 + DELTA_DIGITS_LEN); try { Long.parseLong(idStr); } catch (NumberFormatException ex) { @@ -198,8 +197,8 @@ public boolean accept(Path path) { } public static Long extractWriteId(Path file) { String fileName = file.getName(); - String[] parts = fileName.split("_", 3); - if (parts.length < 2 || !MM_PREFIX.equals(parts[0])) { + String[] parts = fileName.split("_", 4); // e.g. delta_0000001_0000001_0000 + if (parts.length < 4 || !DELTA_PREFIX.equals(parts[0])) { LOG.info("Cannot extract write ID for a MM table: " + file + " (" + Arrays.toString(parts) + ")"); return null; diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java index 0c51a68..e5ce521 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java @@ -103,7 +103,7 @@ protected void setUp() { db.dropTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, src, true, true); db.createTable(src, cols, null, TextInputFormat.class, IgnoreKeyTextOutputFormat.class); - db.loadTable(hadoopDataFile[i], src, false, false, false, false, false, null); +// db.loadTable(hadoopDataFile[i], src, false, false, false, false, false, null); i++; } diff --git metastore/if/hive_metastore.thrift metastore/if/hive_metastore.thrift index a0c6e59..66a6011 100755 --- metastore/if/hive_metastore.thrift +++ metastore/if/hive_metastore.thrift @@ -139,8 +139,10 @@ enum DataOperationType { INSERT = 2 UPDATE = 3, DELETE = 4, - UNSET = 5,//this is the default to distinguish from NULL from old clients - NO_TXN = 6,//drop table, insert overwrite, etc - something non-transactional + INSERT_OVERWRITE = 5, + CTAS = 6, + UNSET = 7,//this is the default to distinguish from NULL from old clients + NO_TXN = 8,//drop table, insert overwrite, etc - something non-transactional } // Types of events the client can request that the metastore fire. For now just support DML operations, as the metastore knows diff --git metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp index 96c2b0b..f5e47b9 100644 --- metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp +++ metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp @@ -124,6 +124,8 @@ int _kDataOperationTypeValues[] = { DataOperationType::INSERT, DataOperationType::UPDATE, DataOperationType::DELETE, + DataOperationType::INSERT_OVERWRITE, + DataOperationType::CTAS, DataOperationType::UNSET, DataOperationType::NO_TXN }; @@ -132,10 +134,12 @@ const char* _kDataOperationTypeNames[] = { "INSERT", "UPDATE", "DELETE", + "INSERT_OVERWRITE", + "CTAS", "UNSET", "NO_TXN" }; -const std::map _DataOperationType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(6, _kDataOperationTypeValues, _kDataOperationTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); +const std::map _DataOperationType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(8, _kDataOperationTypeValues, _kDataOperationTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); int _kEventRequestTypeValues[] = { EventRequestType::INSERT, diff --git metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h index 3f2fa56..ce82564 100644 --- metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h +++ metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h @@ -115,8 +115,10 @@ struct DataOperationType { INSERT = 2, UPDATE = 3, DELETE = 4, - UNSET = 5, - NO_TXN = 6 + INSERT_OVERWRITE = 5, + CTAS = 6, + UNSET = 7, + NO_TXN = 8 }; }; @@ -5253,8 +5255,8 @@ class LockComponent { LockComponent(const LockComponent&); LockComponent& operator=(const LockComponent&); - LockComponent() : type((LockType::type)0), level((LockLevel::type)0), dbname(), tablename(), partitionname(), operationType((DataOperationType::type)5), isAcid(false), isDynamicPartitionWrite(false) { - operationType = (DataOperationType::type)5; + LockComponent() : type((LockType::type)0), level((LockLevel::type)0), dbname(), tablename(), partitionname(), operationType((DataOperationType::type)7), isAcid(false), isDynamicPartitionWrite(false) { + operationType = (DataOperationType::type)7; } @@ -6310,8 +6312,8 @@ class AddDynamicPartitions { AddDynamicPartitions(const AddDynamicPartitions&); AddDynamicPartitions& operator=(const AddDynamicPartitions&); - AddDynamicPartitions() : txnid(0), dbname(), tablename(), operationType((DataOperationType::type)5) { - operationType = (DataOperationType::type)5; + AddDynamicPartitions() : txnid(0), dbname(), tablename(), operationType((DataOperationType::type)7) { + operationType = (DataOperationType::type)7; } diff --git metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/DataOperationType.java metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/DataOperationType.java index 15a6e9a..0343662 100644 --- metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/DataOperationType.java +++ metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/DataOperationType.java @@ -16,8 +16,10 @@ INSERT(2), UPDATE(3), DELETE(4), - UNSET(5), - NO_TXN(6); + INSERT_OVERWRITE(5), + CTAS(6), + UNSET(7), + NO_TXN(8); private final int value; @@ -47,8 +49,12 @@ public static DataOperationType findByValue(int value) { case 4: return DELETE; case 5: - return UNSET; + return INSERT_OVERWRITE; case 6: + return CTAS; + case 7: + return UNSET; + case 8: return NO_TXN; default: return null; diff --git metastore/src/gen/thrift/gen-php/metastore/Types.php metastore/src/gen/thrift/gen-php/metastore/Types.php index 4dcfc76..14d77c3 100644 --- metastore/src/gen/thrift/gen-php/metastore/Types.php +++ metastore/src/gen/thrift/gen-php/metastore/Types.php @@ -119,15 +119,19 @@ final class DataOperationType { const INSERT = 2; const UPDATE = 3; const DELETE = 4; - const UNSET = 5; - const NO_TXN = 6; + const INSERT_OVERWRITE = 5; + const CTAS = 6; + const UNSET = 7; + const NO_TXN = 8; static public $__names = array( 1 => 'SELECT', 2 => 'INSERT', 3 => 'UPDATE', 4 => 'DELETE', - 5 => 'UNSET', - 6 => 'NO_TXN', + 5 => 'INSERT_OVERWRITE', + 6 => 'CTAS', + 7 => 'UNSET', + 8 => 'NO_TXN', ); } @@ -12792,7 +12796,7 @@ class LockComponent { /** * @var int */ - public $operationType = 5; + public $operationType = 7; /** * @var bool */ @@ -15391,7 +15395,7 @@ class AddDynamicPartitions { /** * @var int */ - public $operationType = 5; + public $operationType = 7; public function __construct($vals=null) { if (!isset(self::$_TSPEC)) { diff --git metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py index 9ac1974..b0c0f12 100644 --- metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py +++ metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py @@ -173,16 +173,20 @@ class DataOperationType: INSERT = 2 UPDATE = 3 DELETE = 4 - UNSET = 5 - NO_TXN = 6 + INSERT_OVERWRITE = 5 + CTAS = 6 + UNSET = 7 + NO_TXN = 8 _VALUES_TO_NAMES = { 1: "SELECT", 2: "INSERT", 3: "UPDATE", 4: "DELETE", - 5: "UNSET", - 6: "NO_TXN", + 5: "INSERT_OVERWRITE", + 6: "CTAS", + 7: "UNSET", + 8: "NO_TXN", } _NAMES_TO_VALUES = { @@ -190,8 +194,10 @@ class DataOperationType: "INSERT": 2, "UPDATE": 3, "DELETE": 4, - "UNSET": 5, - "NO_TXN": 6, + "INSERT_OVERWRITE": 5, + "CTAS": 6, + "UNSET": 7, + "NO_TXN": 8, } class EventRequestType: @@ -8855,7 +8861,7 @@ class LockComponent: (3, TType.STRING, 'dbname', None, None, ), # 3 (4, TType.STRING, 'tablename', None, None, ), # 4 (5, TType.STRING, 'partitionname', None, None, ), # 5 - (6, TType.I32, 'operationType', None, 5, ), # 6 + (6, TType.I32, 'operationType', None, 7, ), # 6 (7, TType.BOOL, 'isAcid', None, False, ), # 7 (8, TType.BOOL, 'isDynamicPartitionWrite', None, False, ), # 8 ) @@ -10691,7 +10697,7 @@ class AddDynamicPartitions: (2, TType.STRING, 'dbname', None, None, ), # 2 (3, TType.STRING, 'tablename', None, None, ), # 3 (4, TType.LIST, 'partitionnames', (TType.STRING,None), None, ), # 4 - (5, TType.I32, 'operationType', None, 5, ), # 5 + (5, TType.I32, 'operationType', None, 7, ), # 5 ) def __init__(self, txnid=None, dbname=None, tablename=None, partitionnames=None, operationType=thrift_spec[5][4],): diff --git metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb index da24113..cf32009 100644 --- metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -84,10 +84,12 @@ module DataOperationType INSERT = 2 UPDATE = 3 DELETE = 4 - UNSET = 5 - NO_TXN = 6 - VALUE_MAP = {1 => "SELECT", 2 => "INSERT", 3 => "UPDATE", 4 => "DELETE", 5 => "UNSET", 6 => "NO_TXN"} - VALID_VALUES = Set.new([SELECT, INSERT, UPDATE, DELETE, UNSET, NO_TXN]).freeze + INSERT_OVERWRITE = 5 + CTAS = 6 + UNSET = 7 + NO_TXN = 8 + VALUE_MAP = {1 => "SELECT", 2 => "INSERT", 3 => "UPDATE", 4 => "DELETE", 5 => "INSERT_OVERWRITE", 6 => "CTAS", 7 => "UNSET", 8 => "NO_TXN"} + VALID_VALUES = Set.new([SELECT, INSERT, UPDATE, DELETE, INSERT_OVERWRITE, CTAS, UNSET, NO_TXN]).freeze end module EventRequestType @@ -1988,7 +1990,7 @@ class LockComponent DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbname'}, TABLENAME => {:type => ::Thrift::Types::STRING, :name => 'tablename', :optional => true}, PARTITIONNAME => {:type => ::Thrift::Types::STRING, :name => 'partitionname', :optional => true}, - OPERATIONTYPE => {:type => ::Thrift::Types::I32, :name => 'operationType', :default => 5, :optional => true, :enum_class => ::DataOperationType}, + OPERATIONTYPE => {:type => ::Thrift::Types::I32, :name => 'operationType', :default => 7, :optional => true, :enum_class => ::DataOperationType}, ISACID => {:type => ::Thrift::Types::BOOL, :name => 'isAcid', :default => false, :optional => true}, ISDYNAMICPARTITIONWRITE => {:type => ::Thrift::Types::BOOL, :name => 'isDynamicPartitionWrite', :default => false, :optional => true} } @@ -2403,7 +2405,7 @@ class AddDynamicPartitions DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbname'}, TABLENAME => {:type => ::Thrift::Types::STRING, :name => 'tablename'}, PARTITIONNAMES => {:type => ::Thrift::Types::LIST, :name => 'partitionnames', :element => {:type => ::Thrift::Types::STRING}}, - OPERATIONTYPE => {:type => ::Thrift::Types::I32, :name => 'operationType', :default => 5, :optional => true, :enum_class => ::DataOperationType} + OPERATIONTYPE => {:type => ::Thrift::Types::I32, :name => 'operationType', :default => 7, :optional => true, :enum_class => ::DataOperationType} } def struct_fields; FIELDS; end diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index c0518ad..cb107f6 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -150,7 +150,7 @@ static private boolean doRetryOnConnPool = false; private enum OpertaionType { - SELECT('s'), INSERT('i'), UPDATE('u'), DELETE('d'); + SELECT('s'), INSERT('i'), UPDATE('u'), DELETE('d'), INSERT_OVERWRITE('o'), CTAS('c'); private final char sqlConst; OpertaionType(char sqlConst) { this.sqlConst = sqlConst; @@ -168,6 +168,10 @@ public static OpertaionType fromString(char sqlConst) { return UPDATE; case 'd': return DELETE; + case 'o': + return INSERT_OVERWRITE; + case 'c': + return CTAS; default: throw new IllegalArgumentException(quoteChar(sqlConst)); } @@ -182,6 +186,10 @@ public static OpertaionType fromDataOperationType(DataOperationType dop) { return OpertaionType.UPDATE; case DELETE: return OpertaionType.DELETE; + case INSERT_OVERWRITE: + return OpertaionType.INSERT_OVERWRITE; + case CTAS: + return OpertaionType.CTAS; default: throw new IllegalArgumentException("Unexpected value: " + dop); } @@ -967,6 +975,7 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc case INSERT: case UPDATE: case DELETE: + case INSERT_OVERWRITE: if(!lc.isSetIsDynamicPartitionWrite()) { //must be old client talking, i.e. we don't know if it's DP so be conservative updateTxnComponents = true; @@ -981,10 +990,11 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc } break; case SELECT: + case CTAS: updateTxnComponents = false; break; default: - //since we have an open transaction, only 4 values above are expected + //since we have an open transaction, only 6 values above are expected throw new IllegalStateException("Unexpected DataOperationType: " + lc.getOperationType() + " agentInfo=" + rqst.getAgentInfo() + " " + JavaUtils.txnIdToString(txnid)); } diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index f01c3d5..7ddb27f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -167,6 +167,9 @@ // whether any ACID table is involved in a query private boolean acidInQuery; + // A list of FileSinkOperators writing in an MM compliant manner + private Set mmSinks; + // A limit on the number of threads that can be launched private int maxthreads; private int tryCount = Integer.MAX_VALUE; @@ -520,6 +523,9 @@ public void run() { // them later. acidSinks = sem.getAcidFileSinks(); + // Record any MM compliant FileSinkOperators too, since we also add transaction ID to them + mmSinks = sem.getMmFileSinks(); + LOG.info("Semantic Analysis Completed"); // validate the plan @@ -1118,7 +1124,7 @@ private int acquireLocksAndOpenTxn(boolean startTxnImplicitly) { boolean initiatingTransaction = false; boolean readOnlyQueryInAutoCommit = false; if((txnMgr.getAutoCommit() && haveAcidWrite()) || plan.getOperation() == HiveOperation.START_TRANSACTION || - (!txnMgr.getAutoCommit() && startTxnImplicitly)) { + (!txnMgr.getAutoCommit() && startTxnImplicitly) || (txnMgr.getAutoCommit() && haveMmWrite())) { if(txnMgr.isTxnOpen()) { throw new RuntimeException("Already have an open transaction txnid:" + txnMgr.getCurrentTxnId()); } @@ -1138,6 +1144,13 @@ private int acquireLocksAndOpenTxn(boolean startTxnImplicitly) { desc.setStatementId(txnMgr.getWriteIdAndIncrement()); } } + // Set the transaction id in all of the mm file sinks + if (haveMmWrite()) { + for (FileSinkDesc desc : mmSinks) { + desc.setTransactionId(txnMgr.getCurrentTxnId()); + desc.setStatementId(txnMgr.getWriteIdAndIncrement()); + } + } /*Note, we have to record snapshot after lock acquisition to prevent lost update problem consider 2 concurrent "update table T set x = x + 1". 1st will get the locks and the 2nd will block until 1st one commits and only then lock in the snapshot, i.e. it will @@ -1169,6 +1182,11 @@ private int acquireLocksAndOpenTxn(boolean startTxnImplicitly) { private boolean haveAcidWrite() { return acidSinks != null && !acidSinks.isEmpty(); } + + private boolean haveMmWrite() { + return mmSinks != null && !mmSinks.isEmpty(); + } + /** * @param commit if there is an open transaction and if true, commit, * if false rollback. If there is no open transaction this parameter is ignored. @@ -1529,13 +1547,13 @@ else if(!plan.getAutoCommitValue() && txnManager.getAutoCommit()) { return rollback(createProcessorResponse(ret)); } } - +/* try { acquireWriteIds(plan, conf); } catch (HiveException e) { return handleHiveException(e, 1); } - +*/ ret = execute(true); if (ret != 0) { //if needRequireLock is false, the release here will do nothing because there is no lock diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java index 1315b99..83041e7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java @@ -89,7 +89,7 @@ public void initializeOp(Configuration hconf) throws HiveException { .isListBucketingAlterTableConcatenate(); listBucketingDepth = conf.getListBucketingDepth(); Path specPath = conf.getOutputPath(); - isMmTable = conf.getMmWriteId() != null; + isMmTable = conf.getTxnId() != null; if (isMmTable) { updatePaths(specPath, null); } else { @@ -246,7 +246,7 @@ public void closeOp(boolean abort) throws HiveException { // There's always just one file that we have merged. // The union/DP/etc. should already be account for in the path. Utilities.writeMmCommitManifest(Lists.newArrayList(outPath), - tmpPath.getParent(), fs, taskId, conf.getMmWriteId(), null); + tmpPath.getParent(), fs, taskId, conf.getTxnId(), conf.getStmtId(), null); LOG.info("Merged into " + finalPath + "(" + fss.getLen() + " bytes)."); } } @@ -280,8 +280,9 @@ public void jobCloseOp(Configuration hconf, boolean success) try { Path outputDir = conf.getOutputPath(); FileSystem fs = outputDir.getFileSystem(hconf); - Long mmWriteId = conf.getMmWriteId(); - if (mmWriteId == null) { + Long txnId = conf.getTxnId(); + int stmtId = conf.getStmtId(); + if (txnId == 0) { Path backupPath = backupOutputPath(fs, outputDir); Utilities.mvFileToFinalPath( outputDir, hconf, success, LOG, conf.getDpCtx(), null, reporter); @@ -297,7 +298,7 @@ public void jobCloseOp(Configuration hconf, boolean success) // We don't expect missing buckets from mere (actually there should be no buckets), // so just pass null as bucketing context. Union suffix should also be accounted for. Utilities.handleMmTableFinalPath(outputDir.getParent(), null, hconf, success, - dpLevels, lbLevels, null, mmWriteId, reporter, false); + dpLevels, lbLevels, null, txnId, stmtId, reporter, false); } } catch (IOException e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index ca5dce0..50e091b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -3921,7 +3921,7 @@ private boolean isSchemaEvolutionEnabled(Table tbl) { handleRemoveMm(tbl.getDataLocation(), ids, allMmDirs); } List targetPaths = new ArrayList<>(allMmDirs.size()); - int prefixLen = ValidWriteIds.MM_PREFIX.length(); + int prefixLen = ValidWriteIds.DELTA_PREFIX.length(); for (int i = 0; i < allMmDirs.size(); ++i) { Path src = allMmDirs.get(i); Path tgt = new Path(src.getParent(), src.getName().substring(prefixLen + 1)); @@ -3989,9 +3989,12 @@ private static void ensureDelete(FileSystem fs, Path path, String what) throws I // We will move all the files in the table/partition directories into the first MM // directory, then commit the first write ID. List srcs = new ArrayList<>(), tgts = new ArrayList<>(); + SessionState ss = SessionState.get(); + HiveTxnManager txnMgr = ss.getTxnMgr(); + Long txnId = txnMgr.getCurrentTxnId(); + int stmtId = txnMgr.getWriteIdAndIncrement(); + String mmDir = AcidUtils.deltaSubdir(txnId, txnId, stmtId); Hive db = getHive(); - long mmWriteId = db.getNextTableWriteId(tbl.getDbName(), tbl.getTableName()); - String mmDir = ValidWriteIds.getMmFilePrefix(mmWriteId); if (tbl.getPartitionKeys().size() > 0) { PartitionIterable parts = new PartitionIterable(db, tbl, null, HiveConf.getIntVar(conf, ConfVars.METASTORE_BATCH_RETRIEVE_MAX)); @@ -4014,7 +4017,7 @@ private static void ensureDelete(FileSystem fs, Path path, String what) throws I // Don't set inputs and outputs - the locks have already been taken so it's pointless. MoveWork mw = new MoveWork(null, null, null, null, false); mw.setMultiFilesDesc(new LoadMultiFilesDesc(srcs, tgts, true, null, null)); - ImportCommitWork icw = new ImportCommitWork(tbl.getDbName(), tbl.getTableName(), mmWriteId); + ImportCommitWork icw = new ImportCommitWork(tbl.getDbName(), tbl.getTableName(), txnId, stmtId); // TODO# this is hacky and will be gone with ACID. The problem is getting the write ID above // modifies the table, but the table object above is preserved and modified without // getting this change, so saving it will overwrite write ID. Ideally, when we save @@ -4022,7 +4025,6 @@ private static void ensureDelete(FileSystem fs, Path path, String what) throws I // There's probably some way in DN to achieve that, but for now let's just update the // original object here. This is safe due to DDL lock and the fact that converting // the table to MM here from non-MM should mean no concurrent write ID updates. - tbl.setMmNextWriteId(mmWriteId + 1); Task mv = TaskFactory.get(mw, conf), ic = TaskFactory.get(icw, conf); mv.addDependentTask(ic); return Lists.>newArrayList(mv); @@ -4434,7 +4436,8 @@ private int createTable(Hive db, CreateTableDesc crtTbl) throws HiveException { // CTAS create the table on a directory that already exists; import creates the table // first (in parallel with copies?), then commits after all the loads. if (crtTbl.isCTAS()) { - db.commitMmTableWrite(tbl, initialWriteId); + // todo this shouldn't be needed +// db.commitMmTableWrite(tbl, initialWriteId); } } if (crtTbl.isCTAS()) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index da78a99..4742309 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidWriteIds; import org.apache.hadoop.hive.conf.HiveConf; @@ -403,12 +404,12 @@ private String processCurrPathForMmWriteIds(InputFormat inputFormat) throws IOEx if (inputFormat instanceof HiveInputFormat) { return StringUtils.escapeString(currPath.toString()); // No need to process here. } - ValidWriteIds ids = extractWriteIdsForCurrentTable(); - if (ids != null) { - Utilities.LOG14535.info("Observing " + currDesc.getTableName() + ": " + ids); + ValidTxnList validTxnList = extractWriteIdsForCurrentTable(); + if (validTxnList != null) { + Utilities.LOG14535.info("Observing " + currDesc.getTableName() + ": " + validTxnList); } - Path[] dirs = HiveInputFormat.processPathsForMmRead(Lists.newArrayList(currPath), job, ids); + Path[] dirs = HiveInputFormat.processPathsForMmRead(Lists.newArrayList(currPath), job, validTxnList); if (dirs == null || dirs.length == 0) { return null; // No valid inputs. This condition is logged inside the call. } @@ -419,11 +420,21 @@ private String processCurrPathForMmWriteIds(InputFormat inputFormat) throws IOEx return str.toString(); } - private ValidWriteIds extractWriteIdsForCurrentTable() { + private ValidTxnList extractWriteIdsForCurrentTable() { if (writeIdMap == null) { writeIdMap = new HashMap(); } - return HiveInputFormat.extractWriteIds(writeIdMap, job, currDesc.getTableName()); + + ValidTxnList validTxnList; + if (org.apache.commons.lang.StringUtils.isBlank(currDesc.getTableName())) { + validTxnList = null; // i.e. not fetching from a table directly but from a temp location + } else { + String txnString = job.get(ValidTxnList.VALID_TXNS_KEY); + validTxnList = txnString == null ? new ValidReadTxnList() : + new ValidReadTxnList(txnString); + } + + return validTxnList; } private FetchInputFormatSplit[] splitSampling(SplitSample splitSample, diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 37c3a96..9096f0d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -175,6 +175,8 @@ int acidLastBucket = -1; int acidFileOffset = -1; private boolean isMmTable; + private Long txnId; + private int stmtId; public FSPaths(Path specPath, boolean isMmTable) { this.isMmTable = isMmTable; @@ -184,6 +186,8 @@ public FSPaths(Path specPath, boolean isMmTable) { } else { tmpPath = specPath; taskOutputTempPath = null; // Should not be used. + txnId = conf.getTransactionId(); + stmtId = conf.getStatementId(); } Utilities.LOG14535.info("new FSPaths for " + numFiles + " files, dynParts = " + bDynParts + ": tmpPath " + tmpPath + ", task path " + taskOutputTempPath @@ -315,7 +319,7 @@ public void initializeBucketPaths(int filesIdx, String taskId, boolean isNativeT } outPaths[filesIdx] = getTaskOutPath(taskId); } else { - String subdirPath = ValidWriteIds.getMmFilePrefix(conf.getMmWriteId()); + String subdirPath = AcidUtils.deltaSubdir(txnId, txnId, stmtId); if (unionPath != null) { // Create the union directory inside the MM directory. subdirPath += Path.SEPARATOR + unionPath; @@ -1182,9 +1186,9 @@ public void closeOp(boolean abort) throws HiveException { fsp.commit(fs, commitPaths); } } - if (conf.getMmWriteId() != null) { - Utilities.writeMmCommitManifest( - commitPaths, specPath, fs, taskId, conf.getMmWriteId(), unionPath); + if (conf.getTxnIdObj() != null) { + Utilities.writeMmCommitManifest(commitPaths, specPath, fs, taskId, + conf.getTxnIdObj(), conf.getStatementId(), unionPath); } // Only publish stats if this operator's flag was set to gather stats if (conf.isGatherStats()) { @@ -1240,7 +1244,7 @@ public void jobCloseOp(Configuration hconf, boolean success) MissingBucketsContext mbc = new MissingBucketsContext( conf.getTableInfo(), numBuckets, conf.getCompressed()); Utilities.handleMmTableFinalPath(specPath, unionSuffix, hconf, success, - dpLevels, lbLevels, mbc, conf.getMmWriteId(), reporter, conf.isMmCtas()); + dpLevels, lbLevels, mbc, conf.getTxnIdObj(), conf.getStatementId(), reporter, conf.isMmCtas()); } } } catch (IOException e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitTask.java index ba009b9..6cc8c5e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitTask.java @@ -35,7 +35,7 @@ public ImportCommitTask() { @Override public int execute(DriverContext driverContext) { - Utilities.LOG14535.info("Executing ImportCommit for " + work.getMmWriteId()); + Utilities.LOG14535.info("Executing ImportCommit for " + work.getTxnId()); try { if (driverContext.getCtx().getExplainAnalyze() == AnalyzeState.RUNNING) { @@ -44,7 +44,8 @@ public int execute(DriverContext driverContext) { } Hive db = getHive(); Table tbl = db.getTable(work.getDbName(), work.getTblName()); - db.commitMmTableWrite(tbl, work.getMmWriteId()); + // todo this shouldn't be needed +// db.commitMmTableWrite(tbl, work.getTxnId()); return 0; } catch (Exception e) { console.printError("Failed with exception " + e.getMessage(), "\n" diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitWork.java ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitWork.java index f62d237..5b59635 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitWork.java @@ -26,16 +26,22 @@ public class ImportCommitWork implements Serializable { private static final long serialVersionUID = 1L; private String dbName, tblName; - private long mmWriteId; + private long txnId; + private int stmtId; - public ImportCommitWork(String dbName, String tblName, long mmWriteId) { - this.mmWriteId = mmWriteId; + public ImportCommitWork(String dbName, String tblName, long txnId, int stmtId) { + this.txnId = txnId; + this.stmtId = stmtId; this.dbName = dbName; this.tblName = tblName; } - public long getMmWriteId() { - return mmWriteId; + public long getTxnId() { + return txnId; + } + + public int getStmtId() { + return stmtId; } public String getDbName() { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index 51a8a79..f366aa6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -304,6 +304,8 @@ public int execute(DriverContext driverContext) { // Next we do this for tables and partitions LoadTableDesc tbd = work.getLoadTableWork(); if (tbd != null) { + Long txnId = tbd.getTxnId(); + int stmtId = tbd.getStmtId(); StringBuilder mesg = new StringBuilder("Loading data to table ") .append( tbd.getTable().getTableName()); if (tbd.getPartitionSpec().size() > 0) { @@ -339,7 +341,7 @@ public int execute(DriverContext driverContext) { } db.loadTable(tbd.getSourcePath(), tbd.getTable().getTableName(), tbd.getReplace(), work.isSrcLocal(), isSkewedStoredAsDirs(tbd), isAcid, hasFollowingStatsTask(), - tbd.getMmWriteId()); + txnId, stmtId); if (work.getOutputs() != null) { DDLTask.addIfAbsentByName(new WriteEntity(table, getWriteType(tbd, work.getLoadTableWork().getWriteType())), work.getOutputs()); @@ -392,6 +394,8 @@ public int execute(DriverContext driverContext) { private DataContainer handleStaticParts(Hive db, Table table, LoadTableDesc tbd, TaskInformation ti) throws HiveException, IOException, InvalidOperationException { + Long txnId = tbd.getTxnId(); + int stmtId = tbd.getStmtId(); List partVals = MetaStoreUtils.getPvals(table.getPartCols(), tbd.getPartitionSpec()); db.validatePartitionNameCharacters(partVals); Utilities.LOG14535.info("loadPartition called from " + tbd.getSourcePath() @@ -400,9 +404,8 @@ private DataContainer handleStaticParts(Hive db, Table table, LoadTableDesc tbd, db.loadSinglePartition(tbd.getSourcePath(), tbd.getTable().getTableName(), tbd.getPartitionSpec(), tbd.getReplace(), tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), work.isSrcLocal(), - (work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID && - work.getLoadTableWork().getWriteType() != AcidUtils.Operation.INSERT_ONLY), - hasFollowingStatsTask(), tbd.getMmWriteId(), isCommitMmWrite); + work.getLoadTableWork().getWriteType(), + hasFollowingStatsTask(), txnId, stmtId, isCommitMmWrite); Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false); if (ti.bucketCols != null || ti.sortCols != null) { @@ -437,6 +440,9 @@ private DataContainer handleDynParts(Hive db, Table table, LoadTableDesc tbd, if (tbd.isMmTable() && !tbd.isCommitMmWrite()) { throw new HiveException("Only single-partition LoadTableDesc can skip commiting write ID"); } + + Long txnId = SessionState.get().getTxnMgr().getCurrentTxnId(); + int stmtId = tbd.getStmtId(); Map, Partition> dp = db.loadDynamicPartitions( tbd.getSourcePath(), @@ -445,11 +451,8 @@ private DataContainer handleDynParts(Hive db, Table table, LoadTableDesc tbd, tbd.getReplace(), dpCtx.getNumDPCols(), (tbd.getLbCtx() == null) ? 0 : tbd.getLbCtx().calculateListBucketingLevel(), - work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID && - work.getLoadTableWork().getWriteType() != AcidUtils.Operation.INSERT_ONLY, - SessionState.get().getTxnMgr().getCurrentTxnId(), hasFollowingStatsTask(), - work.getLoadTableWork().getWriteType(), - tbd.getMmWriteId()); + txnId, stmtId, hasFollowingStatsTask(), + work.getLoadTableWork().getWriteType()); // publish DP columns to its subscribers if (dps != null && dps.size() > 0) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 8e83bb4..c43bc1c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1589,7 +1589,7 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws I int dpLevels = dpCtx == null ? 0 : dpCtx.getNumDPCols(), numBuckets = (conf != null && conf.getTable() != null) ? conf.getTable().getNumBuckets() : 0; - return removeTempOrDuplicateFiles(fs, fileStats, dpLevels, numBuckets, hconf, null); + return removeTempOrDuplicateFiles(fs, fileStats, dpLevels, numBuckets, hconf, null, 0); } private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws IOException { @@ -1605,7 +1605,7 @@ private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws I } public static List removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats, - int dpLevels, int numBuckets, Configuration hconf, Long mmWriteId) throws IOException { + int dpLevels, int numBuckets, Configuration hconf, Long txnId, int stmtId) throws IOException { if (fileStats == null) { return null; } @@ -1624,9 +1624,9 @@ private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws I } FileStatus[] items = fs.listStatus(path); - if (mmWriteId != null) { + if (txnId != null) { Path mmDir = parts[i].getPath(); - if (!mmDir.getName().equals(ValidWriteIds.getMmFilePrefix(mmWriteId))) { + if (!mmDir.getName().equals(AcidUtils.deltaSubdir(txnId, txnId, stmtId))) { throw new IOException("Unexpected non-MM directory name " + mmDir); } Utilities.LOG14535.info("removeTempOrDuplicateFiles processing files in MM directory " + mmDir); @@ -1641,14 +1641,14 @@ private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws I if (items.length == 0) { return result; } - if (mmWriteId == null) { + if (txnId == null) { taskIDToFile = removeTempOrDuplicateFilesNonMm(items, fs); } else { if (items.length > 1) { throw new IOException("Unexpected directories for non-DP MM: " + Arrays.toString(items)); } Path mmDir = items[0].getPath(); - if (!mmDir.getName().equals(ValidWriteIds.getMmFilePrefix(mmWriteId))) { + if (!mmDir.getName().equals(AcidUtils.deltaSubdir(txnId, txnId, stmtId))) { throw new IOException("Unexpected non-MM directory " + mmDir); } Utilities.LOG14535.info( @@ -3994,10 +3994,10 @@ private static void tryDelete(FileSystem fs, Path path) { } public static Path[] getMmDirectoryCandidates(FileSystem fs, Path path, int dpLevels, - int lbLevels, PathFilter filter, long mmWriteId, Configuration conf) throws IOException { + int lbLevels, PathFilter filter, long txnId, int stmtId, Configuration conf) throws IOException { int skipLevels = dpLevels + lbLevels; if (filter == null) { - filter = new ValidWriteIds.IdPathFilter(mmWriteId, true); + filter = new ValidWriteIds.IdPathFilter(txnId, stmtId, true); } if (skipLevels == 0) { return statusToPath(fs.listStatus(path, filter)); @@ -4005,7 +4005,7 @@ private static void tryDelete(FileSystem fs, Path path) { if (HiveConf.getBoolVar(conf, ConfVars.HIVE_MM_AVOID_GLOBSTATUS_ON_S3) && isS3(fs)) { return getMmDirectoryCandidatesRecursive(fs, path, skipLevels, filter); } - return getMmDirectoryCandidatesGlobStatus(fs, path, skipLevels, filter, mmWriteId); + return getMmDirectoryCandidatesGlobStatus(fs, path, skipLevels, filter, txnId, stmtId); } private static boolean isS3(FileSystem fs) { @@ -4073,12 +4073,12 @@ private static boolean isS3(FileSystem fs) { } private static Path[] getMmDirectoryCandidatesGlobStatus(FileSystem fs, - Path path, int skipLevels, PathFilter filter, long mmWriteId) throws IOException { + Path path, int skipLevels, PathFilter filter, long txnId, int stmtId) throws IOException { StringBuilder sb = new StringBuilder(path.toUri().getPath()); for (int i = 0; i < skipLevels; i++) { sb.append(Path.SEPARATOR).append("*"); } - sb.append(Path.SEPARATOR).append(ValidWriteIds.getMmFilePrefix(mmWriteId)); + sb.append(Path.SEPARATOR).append(AcidUtils.deltaSubdir(txnId, txnId, stmtId)); Path pathPattern = new Path(path, sb.toString()); Utilities.LOG14535.info("Looking for files via: " + pathPattern); return statusToPath(fs.globStatus(pathPattern, filter)); @@ -4086,9 +4086,9 @@ private static boolean isS3(FileSystem fs) { private static void tryDeleteAllMmFiles(FileSystem fs, Path specPath, Path manifestDir, int dpLevels, int lbLevels, String unionSuffix, ValidWriteIds.IdPathFilter filter, - long mmWriteId, Configuration conf) throws IOException { + long txnId, int stmtId, Configuration conf) throws IOException { Path[] files = getMmDirectoryCandidates( - fs, specPath, dpLevels, lbLevels, filter, mmWriteId, conf); + fs, specPath, dpLevels, lbLevels, filter, txnId, stmtId, conf); if (files != null) { for (Path path : files) { Utilities.LOG14535.info("Deleting " + path + " on failure"); @@ -4101,10 +4101,10 @@ private static void tryDeleteAllMmFiles(FileSystem fs, Path specPath, Path manif public static void writeMmCommitManifest(List commitPaths, Path specPath, FileSystem fs, - String taskId, Long mmWriteId, String unionSuffix) throws HiveException { + String taskId, Long txnId, int stmtId, String unionSuffix) throws HiveException { if (commitPaths.isEmpty()) return; // We assume one FSOP per task (per specPath), so we create it in specPath. - Path manifestPath = getManifestDir(specPath, mmWriteId, unionSuffix); + Path manifestPath = getManifestDir(specPath, txnId, stmtId, unionSuffix); manifestPath = new Path(manifestPath, taskId + MANIFEST_EXTENSION); Utilities.LOG14535.info("Writing manifest to " + manifestPath + " with " + commitPaths); try { @@ -4123,8 +4123,8 @@ public static void writeMmCommitManifest(List commitPaths, Path specPath, } } - private static Path getManifestDir(Path specPath, long mmWriteId, String unionSuffix) { - Path manifestPath = new Path(specPath, "_tmp." + ValidWriteIds.getMmFilePrefix(mmWriteId)); + private static Path getManifestDir(Path specPath, long txnId, int stmtId, String unionSuffix) { + Path manifestPath = new Path(specPath, "_tmp." + AcidUtils.deltaSubdir(txnId, txnId, stmtId)); return (unionSuffix == null) ? manifestPath : new Path(manifestPath, unionSuffix); } @@ -4140,18 +4140,18 @@ public MissingBucketsContext(TableDesc tableInfo, int numBuckets, boolean isComp } public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Configuration hconf, - boolean success, int dpLevels, int lbLevels, MissingBucketsContext mbc, long mmWriteId, + boolean success, int dpLevels, int lbLevels, MissingBucketsContext mbc, long txnId, int stmtId, Reporter reporter, boolean isMmCtas) throws IOException, HiveException { FileSystem fs = specPath.getFileSystem(hconf); - Path manifestDir = getManifestDir(specPath, mmWriteId, unionSuffix); + Path manifestDir = getManifestDir(specPath, txnId, stmtId, unionSuffix); if (!success) { - ValidWriteIds.IdPathFilter filter = new ValidWriteIds.IdPathFilter(mmWriteId, true); + ValidWriteIds.IdPathFilter filter = new ValidWriteIds.IdPathFilter(txnId, stmtId, true); tryDeleteAllMmFiles(fs, specPath, manifestDir, dpLevels, lbLevels, - unionSuffix, filter, mmWriteId, hconf); + unionSuffix, filter, txnId, stmtId, hconf); return; } - Utilities.LOG14535.info("Looking for manifests in: " + manifestDir + " (" + mmWriteId + ")"); + Utilities.LOG14535.info("Looking for manifests in: " + manifestDir + " (" + txnId + ")"); // TODO# may be wrong if there are no splits (empty insert/CTAS) List manifests = new ArrayList<>(); if (fs.exists(manifestDir)) { @@ -4171,14 +4171,14 @@ public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Con } Utilities.LOG14535.info("Looking for files in: " + specPath); - ValidWriteIds.IdPathFilter filter = new ValidWriteIds.IdPathFilter(mmWriteId, true); + ValidWriteIds.IdPathFilter filter = new ValidWriteIds.IdPathFilter(txnId, stmtId, true); if (isMmCtas && !fs.exists(specPath)) { // TODO: do we also need to do this when creating an empty partition from select? Utilities.LOG14535.info("Creating table directory for CTAS with no output at " + specPath); FileUtils.mkdir(fs, specPath, hconf); } Path[] files = getMmDirectoryCandidates( - fs, specPath, dpLevels, lbLevels, filter, mmWriteId, hconf); + fs, specPath, dpLevels, lbLevels, filter, txnId, stmtId, hconf); ArrayList mmDirectories = new ArrayList<>(); if (files != null) { for (Path path : files) { @@ -4234,7 +4234,7 @@ public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Con finalResults[i] = new PathOnlyFileStatus(mmDirectories.get(i)); } List emptyBuckets = Utilities.removeTempOrDuplicateFiles( - fs, finalResults, dpLevels, mbc == null ? 0 : mbc.numBuckets, hconf, mmWriteId); + fs, finalResults, dpLevels, mbc == null ? 0 : mbc.numBuckets, hconf, txnId, stmtId); // create empty buckets if necessary if (emptyBuckets.size() > 0) { assert mbc != null; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 740488c..cba0434 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -278,7 +278,7 @@ static long parseBase(Path path) { // INSERT_ONLY is a special operation which we only support INSERT operations, no UPDATE/DELETE public enum Operation { - NOT_ACID, INSERT, UPDATE, DELETE, INSERT_ONLY + NOT_ACID, INSERT, UPDATE, DELETE, INSERT_OVERWRITE, CTAS, INSERT_ONLY } /** @@ -297,11 +297,17 @@ public static DataOperationType toDataOperationType(Operation op) { return DataOperationType.UPDATE; case DELETE: return DataOperationType.DELETE; + case INSERT_ONLY: + return DataOperationType.INSERT_OVERWRITE; default: throw new IllegalArgumentException("Unexpected Operation: " + op); } } + public static boolean isAcidType(Operation operation) { + return operation == Operation.INSERT || operation == Operation.UPDATE || operation == Operation.DELETE; + } + public enum AcidBaseFileType { COMPACTED_BASE, // a regular base file generated through major compaction ORIGINAL_BASE, // a non-acid schema file for tables that got converted to acid diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 5ea3cec..26a8e08 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -35,7 +35,10 @@ import java.util.Map.Entry; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.common.ValidReadTxnList; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configurable; @@ -414,10 +417,9 @@ private void addSplitsForGroup(List dirs, TableScanOperator tableScan, Job InputFormat inputFormat, Class inputFormatClass, int splits, TableDesc table, Map writeIdMap, List result) throws IOException { - ValidWriteIds writeIds = extractWriteIds(writeIdMap, conf, table.getTableName()); - if (writeIds != null) { - Utilities.LOG14535.info("Observing " + table.getTableName() + ": " + writeIds); - } + String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY); + ValidTxnList validTxnList = txnString == null ? new ValidReadTxnList() : + new ValidReadTxnList(txnString); Utilities.copyTablePropertiesToConf(table, conf); @@ -425,7 +427,7 @@ private void addSplitsForGroup(List dirs, TableScanOperator tableScan, Job pushFilters(conf, tableScan); } - Path[] finalDirs = processPathsForMmRead(dirs, conf, writeIds); + Path[] finalDirs = processPathsForMmRead(dirs, conf, validTxnList); if (finalDirs == null) { return; // No valid inputs. } @@ -450,13 +452,13 @@ private void addSplitsForGroup(List dirs, TableScanOperator tableScan, Job } public static Path[] processPathsForMmRead(List dirs, JobConf conf, - ValidWriteIds writeIds) throws IOException { - if (writeIds == null) { + ValidTxnList validTxnList) throws IOException { + if (validTxnList == null) { return dirs.toArray(new Path[dirs.size()]); } else { List finalPaths = new ArrayList<>(dirs.size()); for (Path dir : dirs) { - processForWriteIds(dir, conf, writeIds, finalPaths); + processForWriteIds(dir, conf, validTxnList, finalPaths); } if (finalPaths.isEmpty()) { LOG.warn("No valid inputs found in " + dirs); @@ -467,7 +469,7 @@ private void addSplitsForGroup(List dirs, TableScanOperator tableScan, Job } private static void processForWriteIds(Path dir, JobConf conf, - ValidWriteIds writeIds, List finalPaths) throws IOException { + ValidTxnList validTxnList, List finalPaths) throws IOException { FileSystem fs = dir.getFileSystem(conf); Utilities.LOG14535.warn("Checking " + dir + " (root) for inputs"); // Ignore nullscan-optimized paths. @@ -478,17 +480,17 @@ private static void processForWriteIds(Path dir, JobConf conf, FileStatus[] files = fs.listStatus(dir); // TODO: batch? LinkedList subdirs = new LinkedList<>(); for (FileStatus file : files) { - handleNonMmDirChild(file, writeIds, subdirs, finalPaths); + handleNonMmDirChild(file, validTxnList, subdirs, finalPaths); } while (!subdirs.isEmpty()) { Path subdir = subdirs.poll(); for (FileStatus file : fs.listStatus(subdir)) { - handleNonMmDirChild(file, writeIds, subdirs, finalPaths); + handleNonMmDirChild(file, validTxnList, subdirs, finalPaths); } } } - private static void handleNonMmDirChild(FileStatus file, ValidWriteIds writeIds, + private static void handleNonMmDirChild(FileStatus file, ValidTxnList validTxnList, LinkedList subdirs, List finalPaths) { Path path = file.getPath(); Utilities.LOG14535.warn("Checking " + path + " for inputs"); @@ -501,7 +503,7 @@ private static void handleNonMmDirChild(FileStatus file, ValidWriteIds writeIds, subdirs.add(path); return; } - if (!writeIds.isValid(writeId)) { + if (!validTxnList.isTxnValid(writeId)) { Utilities.LOG14535.warn("Ignoring an uncommitted directory " + path); return; } @@ -635,7 +637,7 @@ private static void handleNonMmDirChild(FileStatus file, ValidWriteIds writeIds, perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); return result.toArray(new HiveInputSplit[result.size()]); } - + // todo should deprecate public static ValidWriteIds extractWriteIds(Map writeIdMap, JobConf newjob, String tableName) { if (StringUtils.isBlank(tableName)) return null; diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index d255265..19a59a9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hive.common.util.ShutdownHookManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -266,7 +267,7 @@ LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isB case DDL_EXCLUSIVE: case INSERT_OVERWRITE: compBuilder.setExclusive(); - compBuilder.setOperationType(DataOperationType.NO_TXN); + compBuilder.setOperationType(DataOperationType.INSERT_OVERWRITE); break; case INSERT: @@ -287,7 +288,11 @@ LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isB break; case DDL_SHARED: compBuilder.setShared(); - compBuilder.setOperationType(DataOperationType.NO_TXN); + if (plan.getOperation() == HiveOperation.CREATETABLE_AS_SELECT) { + compBuilder.setOperationType(DataOperationType.CTAS); + } else { + compBuilder.setOperationType(DataOperationType.NO_TXN); + } break; case UPDATE: diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 2e157ad..c8d3d62 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1580,21 +1580,22 @@ public Database getDatabaseCurrent() throws HiveException { public void loadSinglePartition(Path loadPath, String tableName, Map partSpec, boolean replace, boolean inheritTableSpecs, - boolean isSkewedStoreAsSubdir, boolean isSrcLocal, boolean isAcid, - boolean hasFollowingStatsTask, Long mmWriteId, boolean isCommitMmWrite) + boolean isSkewedStoreAsSubdir, boolean isSrcLocal, AcidUtils.Operation operationType, + boolean hasFollowingStatsTask, Long txnId, int stmtId, boolean isCommitMmWrite) throws HiveException { Table tbl = getTable(tableName); - boolean isMmTableWrite = (mmWriteId != null); + boolean isMmTableWrite = (txnId != null); Preconditions.checkState(isMmTableWrite == MetaStoreUtils.isInsertOnlyTable(tbl.getParameters())); loadPartition(loadPath, tbl, partSpec, replace, inheritTableSpecs, - isSkewedStoreAsSubdir, isSrcLocal, isAcid, hasFollowingStatsTask, mmWriteId); + isSkewedStoreAsSubdir, isSrcLocal, operationType, hasFollowingStatsTask, txnId, stmtId); if (isMmTableWrite && isCommitMmWrite) { // The assumption behind committing here is that this partition is the only one outputted. - commitMmTableWrite(tbl, mmWriteId); + // todo this shouldn't be needed +// commitMmTableWrite(tbl, txnId); } } - + // todo should deprecate public void commitMmTableWrite(Table tbl, Long mmWriteId) throws HiveException { try { @@ -1623,11 +1624,11 @@ public void commitMmTableWrite(Table tbl, Long mmWriteId) * location/inputformat/outputformat/serde details from table spec * @param isSrcLocal * If the source directory is LOCAL - * @param isAcid true if this is an ACID operation + * @param operationType operation type */ public Partition loadPartition(Path loadPath, Table tbl, Map partSpec, boolean replace, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, - boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask, Long mmWriteId) + boolean isSrcLocal, AcidUtils.Operation operationType, boolean hasFollowingStatsTask, Long txnId, int stmtId) throws HiveException { Path tblDataLocationPath = tbl.getDataLocation(); try { @@ -1669,39 +1670,39 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin("MoveTask", "FileMoves"); // TODO: this assumes both paths are qualified; which they are, currently. - if (mmWriteId != null && loadPath.equals(newPartPath)) { + if (txnId != null && loadPath.equals(newPartPath)) { // MM insert query, move itself is a no-op. Utilities.LOG14535.info("not moving " + loadPath + " to " + newPartPath + " (MM)"); - assert !isAcid; + assert !AcidUtils.isAcidType(operationType); if (areEventsForDmlNeeded(tbl, oldPart)) { - newFiles = listFilesCreatedByQuery(loadPath, mmWriteId); + newFiles = listFilesCreatedByQuery(loadPath, txnId, stmtId); } Utilities.LOG14535.info("maybe deleting stuff from " + oldPartPath + " (new " + newPartPath + ") for replace"); if (replace && oldPartPath != null) { deleteOldPathForReplace(newPartPath, oldPartPath, getConf(), - new ValidWriteIds.IdPathFilter(mmWriteId, false, true), mmWriteId != null, + new ValidWriteIds.IdPathFilter(txnId, stmtId, false, true), txnId != null, tbl.isStoredAsSubDirectories() ? tbl.getSkewedColNames().size() : 0); } } else { // Either a non-MM query, or a load into MM table from an external source. PathFilter filter = FileUtils.HIDDEN_FILES_PATH_FILTER; Path destPath = newPartPath; - if (mmWriteId != null) { + if (txnId != null) { // We will load into MM directory, and delete from the parent if needed. - destPath = new Path(destPath, ValidWriteIds.getMmFilePrefix(mmWriteId)); - filter = replace ? new ValidWriteIds.IdPathFilter(mmWriteId, false, true) : filter; + destPath = new Path(destPath, AcidUtils.deltaSubdir(txnId, txnId, stmtId)); + filter = replace ? new ValidWriteIds.IdPathFilter(txnId, stmtId, false, true) : filter; } Utilities.LOG14535.info("moving " + loadPath + " to " + destPath); - if (replace || (oldPart == null && !isAcid)) { + if (replace || (oldPart == null && !AcidUtils.isAcidType(operationType))) { replaceFiles(tbl.getPath(), loadPath, destPath, oldPartPath, getConf(), - isSrcLocal, filter, mmWriteId != null); + isSrcLocal, filter, txnId != null); } else { if (areEventsForDmlNeeded(tbl, oldPart)) { newFiles = Collections.synchronizedList(new ArrayList()); } FileSystem fs = tbl.getDataLocation().getFileSystem(conf); - Hive.copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcid, newFiles); + Hive.copyFiles(conf, loadPath, destPath, fs, isSrcLocal, AcidUtils.isAcidType(operationType), newFiles); } } perfLogger.PerfLogEnd("MoveTask", "FileMoves"); @@ -1779,9 +1780,9 @@ private boolean areEventsForDmlNeeded(Table tbl, Partition oldPart) { return conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && oldPart != null; } - private List listFilesCreatedByQuery(Path loadPath, long mmWriteId) throws HiveException { + private List listFilesCreatedByQuery(Path loadPath, Long txnId, int stmtId) throws HiveException { List newFiles = new ArrayList(); - final String filePrefix = ValidWriteIds.getMmFilePrefix(mmWriteId); + final String filePrefix = AcidUtils.deltaSubdir(txnId, txnId, stmtId); FileStatus[] srcs; FileSystem srcFs; try { @@ -1944,11 +1945,11 @@ private void constructOneLBLocationMap(FileStatus fSta, * @throws HiveException */ private Set getValidPartitionsInPath( - int numDP, int numLB, Path loadPath, Long mmWriteId) throws HiveException { + int numDP, int numLB, Path loadPath, Long txnId, int stmtId) throws HiveException { Set validPartitions = new HashSet(); try { FileSystem fs = loadPath.getFileSystem(conf); - if (mmWriteId == null) { + if (txnId == null) { FileStatus[] leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP, fs); // Check for empty partitions for (FileStatus s : leafStatus) { @@ -1963,7 +1964,7 @@ private void constructOneLBLocationMap(FileStatus fSta, // The non-MM path only finds new partitions, as it is looking at the temp path. // To produce the same effect, we will find all the partitions affected by this write ID. Path[] leafStatus = Utilities.getMmDirectoryCandidates( - fs, loadPath, numDP, numLB, null, mmWriteId, conf); + fs, loadPath, numDP, numLB, null, txnId, stmtId, conf); for (Path p : leafStatus) { Path dpPath = p.getParent(); // Skip the MM directory that we have found. for (int i = 0; i < numLB; ++i) { @@ -2009,8 +2010,8 @@ private void constructOneLBLocationMap(FileStatus fSta, */ public Map, Partition> loadDynamicPartitions(final Path loadPath, final String tableName, final Map partSpec, final boolean replace, - final int numDP, final int numLB, final boolean isAcid, final long txnId, - final boolean hasFollowingStatsTask, final AcidUtils.Operation operation, final Long mmWriteId) + final int numDP, final int numLB, final Long txnId, final int stmtId, + final boolean hasFollowingStatsTask, final AcidUtils.Operation operation) throws HiveException { final Map, Partition> partitionsMap = @@ -2025,7 +2026,7 @@ private void constructOneLBLocationMap(FileStatus fSta, // Get all valid partition paths and existing partitions for them (if any) final Table tbl = getTable(tableName); - final Set validPartitions = getValidPartitionsInPath(numDP, numLB, loadPath, mmWriteId); + final Set validPartitions = getValidPartitionsInPath(numDP, numLB, loadPath, txnId, stmtId); final int partsToLoad = validPartitions.size(); final AtomicInteger partitionsLoaded = new AtomicInteger(0); @@ -2059,7 +2060,7 @@ public Void call() throws Exception { Utilities.LOG14535.info("loadPartition called for DPP from " + partPath + " to " + tbl.getTableName()); Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, replace, true, numLB > 0, - false, isAcid, hasFollowingStatsTask, mmWriteId); + false, operation, hasFollowingStatsTask, txnId, stmtId); partitionsMap.put(fullPartSpec, newPartition); if (inPlaceEligible) { @@ -2078,7 +2079,7 @@ public Void call() throws Exception { + " partSpec=" + fullPartSpec + ", " + " replace=" + replace + ", " + " listBucketingLevel=" + numLB + ", " - + " isAcid=" + isAcid + ", " + + " isAcid=" + AcidUtils.isAcidType(operation) + ", " + " hasFollowingStatsTask=" + hasFollowingStatsTask, t); throw t; } @@ -2091,9 +2092,10 @@ public Void call() throws Exception { for (Future future : futures) { future.get(); } - if (mmWriteId != null) { + if (txnId != null) { // Commit after we have processed all the partitions. - commitMmTableWrite(tbl, mmWriteId); + // todo this shouldn't be needed +// commitMmTableWrite(tbl, txnId); } } catch (InterruptedException | ExecutionException e) { LOG.debug("Cancelling " + futures.size() + " dynamic loading tasks"); @@ -2107,7 +2109,7 @@ public Void call() throws Exception { } try { - if (isAcid) { + if (AcidUtils.isAcidType(operation)) { List partNames = new ArrayList<>(partitionsMap.size()); for (Partition p : partitionsMap.values()) { partNames.add(p.getName()); @@ -2145,8 +2147,7 @@ public Void call() throws Exception { */ public void loadTable(Path loadPath, String tableName, boolean replace, boolean isSrcLocal, boolean isSkewedStoreAsSubdir, boolean isAcid, boolean hasFollowingStatsTask, - Long mmWriteId) throws HiveException { - + Long txnId, int stmtId) throws HiveException { List newFiles = null; Table tbl = getTable(tableName); HiveConf sessionConf = SessionState.getSessionConf(); @@ -2154,28 +2155,28 @@ public void loadTable(Path loadPath, String tableName, boolean replace, boolean newFiles = Collections.synchronizedList(new ArrayList()); } // TODO: this assumes both paths are qualified; which they are, currently. - if (mmWriteId != null && loadPath.equals(tbl.getPath())) { + if (txnId != null && loadPath.equals(tbl.getPath())) { Utilities.LOG14535.info("not moving " + loadPath + " to " + tbl.getPath()); if (replace) { Path tableDest = tbl.getPath(); deleteOldPathForReplace(tableDest, tableDest, sessionConf, - new ValidWriteIds.IdPathFilter(mmWriteId, false, true), mmWriteId != null, + new ValidWriteIds.IdPathFilter(txnId, stmtId, false, true), txnId != null, tbl.isStoredAsSubDirectories() ? tbl.getSkewedColNames().size() : 0); } - newFiles = listFilesCreatedByQuery(loadPath, mmWriteId); + newFiles = listFilesCreatedByQuery(loadPath, txnId, stmtId); } else { // Either a non-MM query, or a load into MM table from an external source. Path tblPath = tbl.getPath(), destPath = tblPath; PathFilter filter = FileUtils.HIDDEN_FILES_PATH_FILTER; - if (mmWriteId != null) { + if (txnId != null) { // We will load into MM directory, and delete from the parent if needed. - destPath = new Path(destPath, ValidWriteIds.getMmFilePrefix(mmWriteId)); - filter = replace ? new ValidWriteIds.IdPathFilter(mmWriteId, false, true) : filter; + destPath = new Path(destPath, AcidUtils.deltaSubdir(txnId, txnId, stmtId)); + filter = replace ? new ValidWriteIds.IdPathFilter(txnId, stmtId, false, true) : filter; } Utilities.LOG14535.info("moving " + loadPath + " to " + tblPath); if (replace) { replaceFiles(tblPath, loadPath, destPath, tblPath, - sessionConf, isSrcLocal, filter, mmWriteId != null); + sessionConf, isSrcLocal, filter, txnId != null); } else { try { FileSystem fs = tbl.getDataLocation().getFileSystem(sessionConf); @@ -2217,8 +2218,9 @@ public void loadTable(Path loadPath, String tableName, boolean replace, boolean throw new HiveException(e); } - if (mmWriteId != null) { - commitMmTableWrite(tbl, mmWriteId); + if (txnId != null) { + // todo this shouldn't be needed +// commitMmTableWrite(tbl, txnId); } fireInsertEvent(tbl, null, newFiles); @@ -4269,6 +4271,7 @@ public void addForeignKey(List foreignKeyCols) } } + // todo should deprecate public long getNextTableWriteId(String dbName, String tableName) throws HiveException { try { return getMSC().getNextTableWriteId(dbName, tableName); @@ -4277,6 +4280,7 @@ public long getNextTableWriteId(String dbName, String tableName) throws HiveExce } } + // todo should deprecate public ValidWriteIds getValidWriteIdsForTable( String dbName, String tableName) throws HiveException { try { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index a777475..e2b846d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -1266,7 +1266,7 @@ public static void createMRWorkForMergingFiles (FileSinkOperator fsInput, // FileSinkDesc fsInputDesc = fsInput.getConf(); Utilities.LOG14535.info("Creating merge work from " + System.identityHashCode(fsInput) - + " with write ID " + (fsInputDesc.isMmTable() ? fsInputDesc.getMmWriteId() : null) + " into " + finalName); + + " with write ID " + (fsInputDesc.isMmTable() ? fsInputDesc.getTxnIdObj() : null) + " into " + finalName); boolean isBlockMerge = (conf.getBoolVar(ConfVars.HIVEMERGERCFILEBLOCKLEVEL) && fsInputDesc.getTableInfo().getInputFileFormatClass().equals(RCFileInputFormat.class)) || @@ -1274,7 +1274,7 @@ public static void createMRWorkForMergingFiles (FileSinkOperator fsInput, fsInputDesc.getTableInfo().getInputFileFormatClass().equals(OrcInputFormat.class)); RowSchema inputRS = fsInput.getSchema(); - Long srcMmWriteId = fsInputDesc.isMmTable() ? fsInputDesc.getMmWriteId() : null; + Long srcMmWriteId = fsInputDesc.isMmTable() ? fsInputDesc.getTxnIdObj() : null; FileSinkDesc fsOutputDesc = null; TableScanOperator tsMerge = null; if (!isBlockMerge) { @@ -1286,7 +1286,7 @@ public static void createMRWorkForMergingFiles (FileSinkOperator fsInput, TableDesc ts = (TableDesc) fsInputDesc.getTableInfo().clone(); Path mergeDest = srcMmWriteId == null ? finalName : finalName.getParent(); fsOutputDesc = new FileSinkDesc(mergeDest, ts, conf.getBoolVar(ConfVars.COMPRESSRESULT)); - fsOutputDesc.setMmWriteId(srcMmWriteId); + fsOutputDesc.setTxnIdObj(srcMmWriteId); fsOutputDesc.setIsMerge(true); // Create and attach the filesink for the merge. OperatorFactory.getAndMakeChild(fsOutputDesc, inputRS, tsMerge); @@ -1636,7 +1636,8 @@ public static MapWork createMergeTask(FileSinkDesc fsInputDesc, Path finalName, } else { fmd = new OrcFileMergeDesc(); } - fmd.setMmWriteId(fsInputDesc.getMmWriteId()); + fmd.setTxnId(fsInputDesc.getTxnIdObj()); + fmd.setStmtId(fsInputDesc.getStatementId()); fmd.setDpCtx(fsInputDesc.getDynPartCtx()); fmd.setOutputPath(finalName); fmd.setHasDynamicPartitions(work.hasDynamicPartitions()); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java index 3e749eb..14a745f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java @@ -119,6 +119,9 @@ // whether any ACID table is involved in a query protected boolean acidInQuery; + // Similar to acidFileSinks + protected Set mmFileSinks = new HashSet(); + public static int HIVE_COLUMN_ORDER_ASC = 1; public static int HIVE_COLUMN_ORDER_DESC = 0; public static int HIVE_COLUMN_NULLS_FIRST = 0; @@ -1332,6 +1335,10 @@ public QueryProperties getQueryProperties() { return acidFileSinks; } + public Set getMmFileSinks() { + return mmFileSinks; + } + public boolean hasAcidInQuery() { return acidInQuery; } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java index b5820d6..238f9a5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.parse; +import org.apache.hadoop.hive.common.ValidReadTxnList; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.common.ValidWriteIds; @@ -214,6 +216,9 @@ public static void prepareExport( ? ts.tableHandle.getSkewedColNames().size() : 0; ValidWriteIds ids = isMmTable ? db.getValidWriteIdsForTable( ts.tableHandle.getDbName(), ts.tableHandle.getTableName()) : null; + ValidTxnList validTxnList; + String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY); + validTxnList = txnString == null ? new ValidReadTxnList() : new ValidReadTxnList(txnString); if (ts.tableHandle.isPartitioned()) { for (Partition partition : partitions) { Path fromPath = partition.getDataLocation(); @@ -265,7 +270,7 @@ private static CopyWork createCopyWork(boolean isMmTable, int lbLevels, ValidWri fromPath = fromPath.getFileSystem(conf).makeQualified(fromPath); validPaths = Utilities.getValidMmDirectoriesFromTableOrPart(fromPath, conf, ids, lbLevels); } - if (validPaths == null) { + if (validPaths == null || validPaths.isEmpty()) { return new CopyWork(fromPath, toDataPath, false); // Not MM, or no need to skip anything. } else { return createCopyWorkForValidPaths(fromPath, toDataPath, validPaths); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 6df9ad5..c69da76 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -40,7 +40,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.StatsSetupConst; -import org.apache.hadoop.hive.common.ValidWriteIds; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.TableType; @@ -58,7 +57,9 @@ import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; @@ -297,27 +298,28 @@ public static boolean prepareImport( tableExists = true; } - Long mmWriteId = null; + Long txnId = null; + int stmtId = 0; if (table != null && MetaStoreUtils.isInsertOnlyTable(table.getParameters())) { - mmWriteId = x.getHive().getNextTableWriteId(table.getDbName(), table.getTableName()); + txnId = SessionState.get().getTxnMgr().getCurrentTxnId(); } else if (table == null && isSourceMm) { // We could import everything as is - directories and IDs, but that won't work with ACID // txn ids in future. So, let's import everything into the new MM directory with ID == 0. - mmWriteId = 0l; + txnId = 0l; } - if (mmWriteId != null) { - tblDesc.setInitialMmWriteId(mmWriteId); + if (txnId != null) { + tblDesc.setInitialMmWriteId(txnId); } if (!replicationSpec.isInReplicationScope()) { createRegularImportTasks( tblDesc, partitionDescs, isPartSpecSet, replicationSpec, table, - fromURI, fs, wh, x, mmWriteId, isSourceMm); + fromURI, fs, wh, x, txnId, stmtId, isSourceMm); } else { createReplImportTasks( tblDesc, partitionDescs, isPartSpecSet, replicationSpec, waitOnPrecursor, table, - fromURI, fs, wh, x, mmWriteId, isSourceMm); + fromURI, fs, wh, x, txnId, stmtId, isSourceMm); } return tableExists; } @@ -378,17 +380,17 @@ private static CreateTableDesc getBaseCreateTableDescFromTable(String dbName, private static Task loadTable(URI fromURI, Table table, boolean replace, Path tgtPath, ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x, - Long mmWriteId, boolean isSourceMm) { + Long txnId, int stmtId, boolean isSourceMm) { Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME); - Path destPath = mmWriteId == null ? x.getCtx().getExternalTmpPath(tgtPath) - : new Path(tgtPath, ValidWriteIds.getMmFilePrefix(mmWriteId)); + Path destPath = txnId == null ? x.getCtx().getExternalTmpPath(tgtPath) + : new Path(tgtPath, AcidUtils.deltaSubdir(txnId, txnId, stmtId)); Utilities.LOG14535.info("adding import work for table with source location: " - + dataPath + "; table: " + tgtPath + "; copy destination " + destPath + "; mm " - + mmWriteId + " (src " + isSourceMm + ") for " + (table == null ? "a new table" : table.getTableName())); + + dataPath + "; table: " + tgtPath + "; copy destination " + destPath + "; txnId " + + txnId + " (src " + isSourceMm + ") for " + (table == null ? "a new table" : table.getTableName())); Task copyTask = null; if (replicationSpec.isInReplicationScope()) { - if (isSourceMm || mmWriteId != null) { + if (isSourceMm || txnId != null) { // TODO: ReplCopyTask is completely screwed. Need to support when it's not as screwed. throw new RuntimeException( "Not supported right now because Replication is completely screwed"); @@ -401,7 +403,9 @@ private static CreateTableDesc getBaseCreateTableDescFromTable(String dbName, } LoadTableDesc loadTableWork = new LoadTableDesc(destPath, - Utilities.getTableDesc(table), new TreeMap(), replace, mmWriteId); + Utilities.getTableDesc(table), new TreeMap(), replace, txnId); + loadTableWork.setTxnId(txnId); + loadTableWork.setStmtId(stmtId); MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, null, false); Task loadTableTask = TaskFactory.get(mv, x.getConf()); copyTask.addDependentTask(loadTableTask); @@ -457,7 +461,7 @@ private static CreateTableDesc getBaseCreateTableDescFromTable(String dbName, private static Task addSinglePartition(URI fromURI, FileSystem fs, CreateTableDesc tblDesc, Table table, Warehouse wh, AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec, - EximUtil.SemanticAnalyzerWrapperContext x, Long mmWriteId, boolean isSourceMm, + EximUtil.SemanticAnalyzerWrapperContext x, Long txnId, int stmtId, boolean isSourceMm, Task commitTask) throws MetaException, IOException, HiveException { AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0); @@ -476,17 +480,17 @@ private static CreateTableDesc getBaseCreateTableDescFromTable(String dbName, + partSpecToString(partSpec.getPartSpec()) + " with source location: " + srcLocation); Path tgtLocation = new Path(partSpec.getLocation()); - Path destPath = mmWriteId == null ? x.getCtx().getExternalTmpPath(tgtLocation) - : new Path(tgtLocation, ValidWriteIds.getMmFilePrefix(mmWriteId)); - Path moveTaskSrc = mmWriteId == null ? destPath : tgtLocation; + Path destPath = txnId == null ? x.getCtx().getExternalTmpPath(tgtLocation) + : new Path(tgtLocation, AcidUtils.deltaSubdir(txnId, txnId, stmtId)); + Path moveTaskSrc = txnId == null ? destPath : tgtLocation; Utilities.LOG14535.info("adding import work for partition with source location: " - + srcLocation + "; target: " + tgtLocation + "; copy dest " + destPath + "; mm " - + mmWriteId + " (src " + isSourceMm + ") for " + partSpecToString(partSpec.getPartSpec())); + + srcLocation + "; target: " + tgtLocation + "; copy dest " + destPath + "; txnId " + + txnId + " (src " + isSourceMm + ") for " + partSpecToString(partSpec.getPartSpec())); Task copyTask = null; if (replicationSpec.isInReplicationScope()) { - if (isSourceMm || mmWriteId != null) { + if (isSourceMm || txnId != null) { // TODO: ReplCopyTask is completely screwed. Need to support when it's not as screwed. throw new RuntimeException( "Not supported right now because Replication is completely screwed"); @@ -502,11 +506,13 @@ private static CreateTableDesc getBaseCreateTableDescFromTable(String dbName, Task addPartTask = TaskFactory.get(new DDLWork(x.getInputs(), x.getOutputs(), addPartitionDesc), x.getConf()); LoadTableDesc loadTableWork = new LoadTableDesc(moveTaskSrc, Utilities.getTableDesc(table), - partSpec.getPartSpec(), true, mmWriteId); + partSpec.getPartSpec(), true, txnId); + loadTableWork.setTxnId(txnId); + loadTableWork.setStmtId(stmtId); loadTableWork.setInheritTableSpecs(false); // Do not commit the write ID from each task; need to commit once. // TODO: we should just change the import to use a single MoveTask, like dynparts. - loadTableWork.setIntermediateInMmWrite(mmWriteId != null); + loadTableWork.setIntermediateInMmWrite(txnId != null); Task loadPartTask = TaskFactory.get(new MoveWork( x.getInputs(), x.getOutputs(), loadTableWork, null, false), x.getConf()); copyTask.addDependentTask(loadPartTask); @@ -802,21 +808,21 @@ private static String checkParams(Map map1, private static void createRegularImportTasks( CreateTableDesc tblDesc, List partitionDescs, boolean isPartSpecSet, ReplicationSpec replicationSpec, Table table, URI fromURI, FileSystem fs, Warehouse wh, - EximUtil.SemanticAnalyzerWrapperContext x, Long mmWriteId, boolean isSourceMm) + EximUtil.SemanticAnalyzerWrapperContext x, Long txnId, int stmtId, boolean isSourceMm) throws HiveException, URISyntaxException, IOException, MetaException { if (table != null) { if (table.isPartitioned()) { x.getLOG().debug("table partitioned"); Task ict = createImportCommitTask( - table.getDbName(), table.getTableName(), mmWriteId, x.getConf()); + table.getDbName(), table.getTableName(), txnId, stmtId, x.getConf()); for (AddPartitionDesc addPartitionDesc : partitionDescs) { Map partSpec = addPartitionDesc.getPartition(0).getPartSpec(); org.apache.hadoop.hive.ql.metadata.Partition ptn = null; if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) { x.getTasks().add(addSinglePartition( - fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, mmWriteId, isSourceMm, ict)); + fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, txnId, stmtId, isSourceMm, ict)); } else { throw new SemanticException( ErrorMsg.PARTITION_EXISTS.getMsg(partSpecToString(partSpec))); @@ -828,7 +834,7 @@ private static void createRegularImportTasks( Path tgtPath = new Path(table.getDataLocation().toString()); FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), x.getConf()); checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x); - loadTable(fromURI, table, false, tgtPath, replicationSpec, x, mmWriteId, isSourceMm); + loadTable(fromURI, table, false, tgtPath, replicationSpec, x, txnId, stmtId, isSourceMm); } // Set this to read because we can't overwrite any existing partitions x.getOutputs().add(new WriteEntity(table, WriteEntity.WriteType.DDL_NO_LOCK)); @@ -846,10 +852,10 @@ private static void createRegularImportTasks( if (isPartitioned(tblDesc)) { Task ict = createImportCommitTask( - tblDesc.getDatabaseName(), tblDesc.getTableName(), mmWriteId, x.getConf()); + tblDesc.getDatabaseName(), tblDesc.getTableName(), txnId, stmtId, x.getConf()); for (AddPartitionDesc addPartitionDesc : partitionDescs) { t.addDependentTask(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, - replicationSpec, x, mmWriteId, isSourceMm, ict)); + replicationSpec, x, txnId, stmtId, isSourceMm, ict)); } } else { x.getLOG().debug("adding dependent CopyWork/MoveWork for table"); @@ -866,7 +872,7 @@ private static void createRegularImportTasks( } FileSystem tgtFs = FileSystem.get(tablePath.toUri(), x.getConf()); checkTargetLocationEmpty(tgtFs, tablePath, replicationSpec,x); - t.addDependentTask(loadTable(fromURI, table, false, tablePath, replicationSpec, x, mmWriteId, isSourceMm)); + t.addDependentTask(loadTable(fromURI, table, false, tablePath, replicationSpec, x, txnId, stmtId, isSourceMm)); } } x.getTasks().add(t); @@ -874,10 +880,10 @@ private static void createRegularImportTasks( } private static Task createImportCommitTask( - String dbName, String tblName, Long mmWriteId, HiveConf conf) { + String dbName, String tblName, Long txnId, int stmtId, HiveConf conf) { @SuppressWarnings("unchecked") - Task ict = (mmWriteId == null) ? null : TaskFactory.get( - new ImportCommitWork(dbName, tblName, mmWriteId), conf); + Task ict = (txnId == null) ? null : TaskFactory.get( + new ImportCommitWork(dbName, tblName, txnId, stmtId), conf); return ict; } @@ -889,7 +895,7 @@ private static void createReplImportTasks( List partitionDescs, boolean isPartSpecSet, ReplicationSpec replicationSpec, boolean waitOnPrecursor, Table table, URI fromURI, FileSystem fs, Warehouse wh, - EximUtil.SemanticAnalyzerWrapperContext x, Long mmWriteId, boolean isSourceMm) + EximUtil.SemanticAnalyzerWrapperContext x, Long txnId, int stmtId, boolean isSourceMm) throws HiveException, URISyntaxException, IOException, MetaException { Task dr = null; @@ -958,15 +964,15 @@ private static void createReplImportTasks( if (!replicationSpec.isMetadataOnly()) { if (isPartitioned(tblDesc)) { Task ict = createImportCommitTask( - tblDesc.getDatabaseName(), tblDesc.getTableName(), mmWriteId, x.getConf()); + tblDesc.getDatabaseName(), tblDesc.getTableName(), txnId, stmtId, x.getConf()); for (AddPartitionDesc addPartitionDesc : partitionDescs) { addPartitionDesc.setReplicationSpec(replicationSpec); t.addDependentTask( - addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, mmWriteId, isSourceMm, ict)); + addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, txnId, stmtId, isSourceMm, ict)); } } else { x.getLOG().debug("adding dependent CopyWork/MoveWork for table"); - t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()), replicationSpec, x, mmWriteId, isSourceMm)); + t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()), replicationSpec, x, txnId, stmtId, isSourceMm)); } } if (dr == null){ @@ -986,11 +992,11 @@ private static void createReplImportTasks( Map partSpec = addPartitionDesc.getPartition(0).getPartSpec(); org.apache.hadoop.hive.ql.metadata.Partition ptn = null; Task ict = replicationSpec.isMetadataOnly() ? null : createImportCommitTask( - tblDesc.getDatabaseName(), tblDesc.getTableName(), mmWriteId, x.getConf()); + tblDesc.getDatabaseName(), tblDesc.getTableName(), txnId, stmtId, x.getConf()); if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) { if (!replicationSpec.isMetadataOnly()){ x.getTasks().add(addSinglePartition( - fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, mmWriteId, isSourceMm, ict)); + fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, txnId, stmtId, isSourceMm, ict)); } } else { // If replicating, then the partition already existing means we need to replace, maybe, if @@ -998,7 +1004,7 @@ private static void createReplImportTasks( if (replicationSpec.allowReplacementInto(ptn)){ if (!replicationSpec.isMetadataOnly()){ x.getTasks().add(addSinglePartition( - fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, mmWriteId, isSourceMm, ict)); + fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, txnId, stmtId, isSourceMm, ict)); } else { x.getTasks().add(alterSinglePartition( fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn, x)); @@ -1026,7 +1032,7 @@ private static void createReplImportTasks( } if (!replicationSpec.isMetadataOnly()) { // repl-imports are replace-into - loadTable(fromURI, table, true, new Path(fromURI), replicationSpec, x, mmWriteId, isSourceMm); + loadTable(fromURI, table, true, new Path(fromURI), replicationSpec, x, txnId, stmtId, isSourceMm); } else { x.getTasks().add(alterTableTask(tblDesc, x, replicationSpec)); } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java index d3b4da1..06e186e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java @@ -79,9 +79,9 @@ public IndexUpdater(LoadTableDesc loadTableWork, Set inputs, Map partSpec = ltd.getPartitionSpec(); if (partSpec == null || partSpec.size() == 0) { //unpartitioned table, update whole index - doIndexUpdate(tblIndexes, ltd.getMmWriteId()); + doIndexUpdate(tblIndexes, ltd.getTxnId()); } else { - doIndexUpdate(tblIndexes, partSpec, ltd.getMmWriteId()); + doIndexUpdate(tblIndexes, partSpec, ltd.getTxnId()); } } return tasks; @@ -101,15 +101,15 @@ private void doIndexUpdate(List tblIndexes, Long mmWriteId) throws HiveEx } private void doIndexUpdate(List tblIndexes, Map - partSpec, Long mmWriteId) throws HiveException { + partSpec, Long txnId) throws HiveException { for (Index index : tblIndexes) { if (containsPartition(index, partSpec)) { - doIndexUpdate(index, partSpec, mmWriteId); + doIndexUpdate(index, partSpec, txnId); } } } - private void doIndexUpdate(Index index, Map partSpec, Long mmWriteId) + private void doIndexUpdate(Index index, Map partSpec, Long txnId) throws HiveException { StringBuilder ps = new StringBuilder(); boolean first = true; @@ -134,17 +134,17 @@ private void doIndexUpdate(Index index, Map partSpec, Long mmWri sb.append(" PARTITION "); sb.append(ps.toString()); sb.append(" REBUILD"); - compileRebuild(sb.toString(), index, mmWriteId); + compileRebuild(sb.toString(), index, txnId); } - private void compileRebuild(String query, Index index, Long mmWriteId) + private void compileRebuild(String query, Index index, Long txnId) throws HiveException { Driver driver = new Driver(this.conf); driver.compile(query, false); - if (mmWriteId != null) { + if (txnId != null) { // TODO: this is rather fragile ValidWriteIds.addCurrentToConf( - parentConf, index.getDbName(), index.getOrigTableName(), mmWriteId); + parentConf, index.getDbName(), index.getOrigTableName(), txnId); } tasks.addAll(driver.getPlan().getRootTasks()); inputs.addAll(driver.getPlan().getInputs()); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java index 04e8cac..6cc9abb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; @@ -55,6 +56,7 @@ import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.StatsWork; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.mapred.InputFormat; import com.google.common.collect.Lists; @@ -271,19 +273,18 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { } } - Long mmWriteId = null; + Long txnId = null; + int stmtId = 0; Table tbl = ts.tableHandle; if (MetaStoreUtils.isInsertOnlyTable(tbl.getParameters())) { - try { - mmWriteId = db.getNextTableWriteId(tbl.getDbName(), tbl.getTableName()); - } catch (HiveException e) { - throw new SemanticException(e); - } + txnId = SessionState.get().getTxnMgr().getCurrentTxnId(); } LoadTableDesc loadTableWork; loadTableWork = new LoadTableDesc(new Path(fromURI), - Utilities.getTableDesc(ts.tableHandle), partSpec, isOverWrite, mmWriteId); + Utilities.getTableDesc(ts.tableHandle), partSpec, isOverWrite, txnId); + loadTableWork.setTxnId(txnId); + loadTableWork.setStmtId(stmtId); if (preservePartitionSpecs){ // Note : preservePartitionSpecs=true implies inheritTableSpecs=false but // but preservePartitionSpecs=false(default) here is not sufficient enough diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java index b5f79c8..715a363 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java @@ -125,6 +125,7 @@ private ColumnAccessInfo columnAccessInfo; private boolean needViewColumnAuthorization; private Set acidFileSinks = Collections.emptySet(); + private Set mmFileSinks = Collections.emptySet(); // Map to store mapping between reduce sink Operator and TS Operator for semijoin private Map rsOpToTsOpMap = @@ -192,7 +193,7 @@ public ParseContext( List reduceSinkOperatorsAddedByEnforceBucketingSorting, AnalyzeRewriteContext analyzeRewrite, CreateTableDesc createTableDesc, CreateViewDesc createViewDesc, QueryProperties queryProperties, - Map viewProjectToTableSchema, Set acidFileSinks) { + Map viewProjectToTableSchema, Set acidFileSinks, Set mmFileSinks) { this.queryState = queryState; this.conf = queryState.getConf(); this.opToPartPruner = opToPartPruner; @@ -235,6 +236,10 @@ public ParseContext( this.acidFileSinks = new HashSet<>(); this.acidFileSinks.addAll(acidFileSinks); } + if (mmFileSinks != null && !mmFileSinks.isEmpty()) { + this.mmFileSinks = new HashSet<>(); + this.mmFileSinks.addAll(mmFileSinks); + } } public Set getAcidSinks() { return acidFileSinks; @@ -242,6 +247,11 @@ public ParseContext( public boolean hasAcidWrite() { return !acidFileSinks.isEmpty(); } + + public Set getMmFileSinks() { + return mmFileSinks; + } + /** * @return the context */ diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 2d1d47f..4b4464f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -477,7 +477,7 @@ public ParseContext getParseContext() { listMapJoinOpsNoReducer, prunedPartitions, tabNameToTabObject, opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner, viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting, - analyzeRewrite, tableDesc, createVwDesc, queryProperties, viewProjectToTableSchema, acidFileSinks); + analyzeRewrite, tableDesc, createVwDesc, queryProperties, viewProjectToTableSchema, acidFileSinks, mmFileSinks); } public CompilationOpContext getOpContext() { @@ -6693,7 +6693,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) ListBucketingCtx lbCtx = null; Map partSpec = null; boolean isMmTable = false, isMmCtas = false; - Long mmWriteId = null; + Long txnId = null; switch (dest_type.intValue()) { case QBMetaData.DEST_TABLE: { @@ -6750,14 +6750,15 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) acidOp = getAcidType(dest_tab, table_desc.getOutputFileFormatClass(), dest); checkAcidConstraints(qb, table_desc, dest_tab, acidOp); } - try { - mmWriteId = getMmWriteId(dest_tab, isMmTable); - } catch (HiveException e) { - throw new SemanticException(e); + if (MetaStoreUtils.isInsertOnlyTable(table_desc.getProperties())) { + acidOp = getAcidType(dest_tab, table_desc.getOutputFileFormatClass(), dest); + } + if (isMmTable) { + txnId = SessionState.get().getTxnMgr().getCurrentTxnId(); } boolean isReplace = !qb.getParseInfo().isInsertIntoTable( dest_tab.getDbName(), dest_tab.getTableName()); - ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp, isReplace, mmWriteId); + ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp, isReplace, txnId); ltd.setLbCtx(lbCtx); loadTableWork.add(ltd); } else { @@ -6813,13 +6814,13 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) acidOp = getAcidType(dest_tab, table_desc.getOutputFileFormatClass(), dest); checkAcidConstraints(qb, table_desc, dest_tab, acidOp); } - try { - mmWriteId = getMmWriteId(dest_tab, isMmTable); - } catch (HiveException e) { - // How is this a semantic exception? Stupid Java and signatures. - throw new SemanticException(e); + if (MetaStoreUtils.isInsertOnlyTable(dest_part.getTable().getParameters())) { + acidOp = getAcidType(dest_tab, table_desc.getOutputFileFormatClass(), dest); } - ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp, mmWriteId); + if (isMmTable) { + txnId = SessionState.get().getTxnMgr().getCurrentTxnId(); + } + ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp, txnId); ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(), dest_tab.getTableName())); ltd.setLbCtx(lbCtx); @@ -6853,10 +6854,8 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) destTableIsMaterialization = tblDesc.isMaterialization(); if (!destTableIsTemporary && MetaStoreUtils.isInsertOnlyTable(tblDesc.getTblProps(), true)) { isMmTable = isMmCtas = true; - // TODO# this should really get current ACID txn; assuming ACID works correctly the txn - // should have been opened to create the ACID table. For now use the first ID. - mmWriteId = 0l; - tblDesc.setInitialMmWriteId(mmWriteId); + txnId = SessionState.get().getTxnMgr().getCurrentTxnId(); + tblDesc.setInitialMmWriteId(txnId); } } else if (viewDesc != null) { field_schemas = new ArrayList(); @@ -6983,11 +6982,11 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) genPartnCols(dest, input, qb, table_desc, dest_tab, rsCtx); } - assert isMmTable == (mmWriteId != null); + assert isMmTable == (txnId != null); FileSinkDesc fileSinkDesc = createFileSinkDesc(dest, table_desc, dest_part, dest_path, currentTableId, destTableIsAcid, destTableIsTemporary, destTableIsMaterialization, queryTmpdir, rsCtx, dpCtx, lbCtx, fsRS, - canBeMerged, mmWriteId, isMmCtas); + canBeMerged, txnId, isMmCtas); if (isMmCtas) { // Add FSD so that the LoadTask compilation could fix up its path to avoid the move. tableDesc.setWriter(fileSinkDesc); @@ -7115,7 +7114,12 @@ private FileSinkDesc createFileSinkDesc(String dest, TableDesc table_desc, MetaStoreUtils.isInsertOnlyTable(dest_part.getTable().getParameters())) || (table_desc != null && MetaStoreUtils.isInsertOnlyTable(table_desc.getProperties())); - if (destTableIsAcid && !isDestInsertOnly) { + if (isDestInsertOnly) { + fileSinkDesc.setWriteType(Operation.INSERT_ONLY); + mmFileSinks.add(fileSinkDesc); + } + + if (destTableIsAcid) { AcidUtils.Operation wt = updating(dest) ? AcidUtils.Operation.UPDATE : (deleting(dest) ? AcidUtils.Operation.DELETE : AcidUtils.Operation.INSERT); fileSinkDesc.setWriteType(wt); @@ -11138,7 +11142,7 @@ void analyzeInternal(ASTNode ast, PlannerContext plannerCtx) throws SemanticExce listMapJoinOpsNoReducer, prunedPartitions, tabNameToTabObject, opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner, viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting, - analyzeRewrite, tableDesc, createVwDesc, queryProperties, viewProjectToTableSchema, acidFileSinks); + analyzeRewrite, tableDesc, createVwDesc, queryProperties, viewProjectToTableSchema, acidFileSinks, mmFileSinks); // 5. Take care of view creation if (createVwDesc != null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index 91c343c..d363bfe 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -20,23 +20,19 @@ import java.io.Serializable; import java.util.ArrayList; -import java.util.Collection; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashSet; -import java.util.LinkedList; import java.util.List; -import java.util.Queue; import java.util.Set; -import java.util.Stack; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.HiveStatsUtils; -import org.apache.hadoop.hive.common.ValidWriteIds; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.ql.Context; @@ -44,7 +40,6 @@ import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.ColumnStatsTask; import org.apache.hadoop.hive.ql.exec.FetchTask; -import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.StatsTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -56,7 +51,6 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; -import org.apache.hadoop.hive.ql.optimizer.physical.AnnotateRunTimeStatsOptimizer; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.AnalyzeRewriteContext; import org.apache.hadoop.hive.ql.plan.ColumnStatsDesc; import org.apache.hadoop.hive.ql.plan.ColumnStatsWork; @@ -319,21 +313,22 @@ public void compile(final ParseContext pCtx, final List partitionCols, final DynamicPartitionCtx dpCtx, Path destPath, - Long mmWriteId, boolean isMmCtas) { + Long txnIdObj, boolean isMmCtas) { this.dirName = dirName; this.tableInfo = tableInfo; @@ -130,7 +131,7 @@ public FileSinkDesc(final Path dirName, final TableDesc tableInfo, this.dpCtx = dpCtx; this.dpSortState = DPSortState.NONE; this.destPath = destPath; - this.mmWriteId = mmWriteId; + this.txnIdObj = txnIdObj; this.isMmCtas = isMmCtas; } @@ -153,7 +154,7 @@ public FileSinkDesc(final Path dirName, final TableDesc tableInfo, public Object clone() throws CloneNotSupportedException { FileSinkDesc ret = new FileSinkDesc(dirName, tableInfo, compressed, destTableId, multiFileSpray, canBeMerged, numFiles, totalFiles, - partitionCols, dpCtx, destPath, mmWriteId, isMmCtas); + partitionCols, dpCtx, destPath, txnIdObj, isMmCtas); ret.setCompressCodec(compressCodec); ret.setCompressType(compressType); ret.setGatherStats(gatherStats); @@ -165,6 +166,7 @@ public Object clone() throws CloneNotSupportedException { ret.setDpSortState(dpSortState); ret.setWriteType(writeType); ret.setTransactionId(txnId); + ret.setTxnIdObj(txnId); ret.setStatsTmpDir(statsTmpDir); ret.setIsMerge(isMerge); return ret; @@ -202,8 +204,8 @@ public Path getFinalDirName() { /** getFinalDirName that takes into account MM, but not DP, LB or buckets. */ public Path getMergeInputDirName() { Path root = getFinalDirName(); - if (mmWriteId == null) return root; - return new Path(root, ValidWriteIds.getMmFilePrefix(mmWriteId)); + if (txnIdObj == null) return root; + return new Path(root, AcidUtils.deltaSubdir(txnId, txnId, 0)); // todo stmtid } @Explain(displayName = "table", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) @@ -276,11 +278,19 @@ public void setTemporary(boolean temporary) { } public boolean isMmTable() { - return mmWriteId != null; + return writeType == AcidUtils.Operation.INSERT_ONLY; } - public Long getMmWriteId() { - return mmWriteId; +// public Long getMmWriteId() { +// return mmWriteId; +// } + + public Long getTxnIdObj() { + return txnIdObj; + } + + public void setTxnIdObj(Long txnIdObj) { + this.txnIdObj = txnIdObj; } public boolean isMaterialization() { @@ -470,6 +480,7 @@ public void setWriteType(AcidUtils.Operation type) { public void setTransactionId(long id) { txnId = id; + txnIdObj = id; } public long getTransactionId() { return txnId; @@ -505,9 +516,9 @@ public void setStatsTmpDir(String statsCollectionTempDir) { this.statsTmpDir = statsCollectionTempDir; } - public void setMmWriteId(Long mmWriteId) { - this.mmWriteId = mmWriteId; - } +// public void setMmWriteId(Long mmWriteId) { +// this.mmWriteId = mmWriteId; +// } public void setIsMerge(boolean b) { this.isMerge = b; diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java index 684f1c1..4e7ffce 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java @@ -42,7 +42,8 @@ // Need to remember whether this is an acid compliant operation, and if so whether it is an // insert, update, or delete. private AcidUtils.Operation writeType; - private Long mmWriteId; + private Long txnId; + private int stmtId; // TODO: the below seems like they should just be combined into partitionDesc private org.apache.hadoop.hive.ql.plan.TableDesc table; @@ -65,10 +66,10 @@ public LoadTableDesc(final Path sourcePath, final org.apache.hadoop.hive.ql.plan.TableDesc table, final Map partitionSpec, final boolean replace, - final AcidUtils.Operation writeType, Long mmWriteId) { + final AcidUtils.Operation writeType, Long txnId) { super(sourcePath); Utilities.LOG14535.info("creating part LTD from " + sourcePath + " to " + table.getTableName()/*, new Exception()*/); - init(table, partitionSpec, replace, writeType, mmWriteId); + init(table, partitionSpec, replace, writeType, txnId); } /** @@ -82,15 +83,15 @@ public LoadTableDesc(final Path sourcePath, final TableDesc table, final Map partitionSpec, final boolean replace, - final Long mmWriteId) { - this(sourcePath, table, partitionSpec, replace, AcidUtils.Operation.NOT_ACID, mmWriteId); + final Long txnId) { + this(sourcePath, table, partitionSpec, replace, AcidUtils.Operation.NOT_ACID, txnId); } public LoadTableDesc(final Path sourcePath, final org.apache.hadoop.hive.ql.plan.TableDesc table, final Map partitionSpec, - final AcidUtils.Operation writeType, Long mmWriteId) { - this(sourcePath, table, partitionSpec, true, writeType, mmWriteId); + final AcidUtils.Operation writeType, Long txnId) { + this(sourcePath, table, partitionSpec, true, writeType, txnId); } /** @@ -101,22 +102,22 @@ public LoadTableDesc(final Path sourcePath, */ public LoadTableDesc(final Path sourcePath, final org.apache.hadoop.hive.ql.plan.TableDesc table, - final Map partitionSpec, Long mmWriteId) { - this(sourcePath, table, partitionSpec, true, AcidUtils.Operation.NOT_ACID, mmWriteId); + final Map partitionSpec, Long txnId) { + this(sourcePath, table, partitionSpec, true, AcidUtils.Operation.NOT_ACID, txnId); } public LoadTableDesc(final Path sourcePath, final org.apache.hadoop.hive.ql.plan.TableDesc table, final DynamicPartitionCtx dpCtx, final AcidUtils.Operation writeType, - boolean isReplace, Long mmWriteId) { + boolean isReplace, Long txnId) { super(sourcePath); Utilities.LOG14535.info("creating LTD from " + sourcePath + " to " + table.getTableName()/*, new Exception()*/); this.dpCtx = dpCtx; if (dpCtx != null && dpCtx.getPartSpec() != null && partitionSpec == null) { - init(table, dpCtx.getPartSpec(), isReplace, writeType, mmWriteId); + init(table, dpCtx.getPartSpec(), isReplace, writeType, txnId); } else { - init(table, new LinkedHashMap(), isReplace, writeType, mmWriteId); + init(table, new LinkedHashMap(), isReplace, writeType, txnId); } } @@ -124,12 +125,12 @@ private void init( final org.apache.hadoop.hive.ql.plan.TableDesc table, final Map partitionSpec, final boolean replace, - AcidUtils.Operation writeType, Long mmWriteId) { + AcidUtils.Operation writeType, Long txnId) { this.table = table; this.partitionSpec = partitionSpec; this.replace = replace; this.writeType = writeType; - this.mmWriteId = mmWriteId; + this.txnId = txnId; } @Explain(displayName = "table", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) @@ -157,11 +158,11 @@ public boolean getReplace() { @Explain(displayName = "micromanaged table") public Boolean isMmTableExplain() { - return mmWriteId != null? true : null; + return writeType == AcidUtils.Operation.INSERT_ONLY; } public boolean isMmTable() { - return mmWriteId != null; + return writeType == AcidUtils.Operation.INSERT_ONLY; } public void setReplace(boolean replace) { @@ -202,8 +203,20 @@ public void setLbCtx(ListBucketingCtx lbCtx) { return writeType; } - public Long getMmWriteId() { - return mmWriteId; + public void setTxnId(Long txnId) { + this.txnId = txnId; + } + + public Long getTxnId() { + return txnId; + } + + public int getStmtId() { + return stmtId; + } + + public void setStmtId(int stmtId) { + this.stmtId = stmtId; } public void setIntermediateInMmWrite(boolean b) { diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java index 4a13e1f..55b9da9 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java @@ -141,7 +141,7 @@ db.dropTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, src, true, true); db.createTable(src, cols, null, TextInputFormat.class, HiveIgnoreKeyTextOutputFormat.class); - db.loadTable(hadoopDataFile[i], src, false, true, false, false, false, null); + db.loadTable(hadoopDataFile[i], src, false, true, false, false, false, null, 0); i++; } diff --git ql/src/test/results/clientpositive/mm_insertonly_acid.q.out ql/src/test/results/clientpositive/mm_insertonly_acid.q.out index 6f7d198..22bdc93 100644 --- ql/src/test/results/clientpositive/mm_insertonly_acid.q.out +++ ql/src/test/results/clientpositive/mm_insertonly_acid.q.out @@ -44,24 +44,24 @@ STAGE PLANS: Map Operator Tree: TableScan alias: qtr_acid - Statistics: Num rows: 16 Data size: 67 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 20 Data size: 47 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: key (type: int), p (type: int) outputColumnNames: _col0, _col1 - Statistics: Num rows: 16 Data size: 67 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 20 Data size: 47 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator key expressions: _col0 (type: int) sort order: + - Statistics: Num rows: 16 Data size: 67 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 20 Data size: 47 Basic stats: COMPLETE Column stats: NONE value expressions: _col1 (type: int) Reduce Operator Tree: Select Operator expressions: KEY.reducesinkkey0 (type: int), VALUE._col0 (type: int) outputColumnNames: _col0, _col1 - Statistics: Num rows: 16 Data size: 67 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 20 Data size: 47 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false - Statistics: Num rows: 16 Data size: 67 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 20 Data size: 47 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat