diff --git common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java index 2f35917..ad02de1 100644 --- common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java +++ common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java @@ -33,6 +33,8 @@ //default value means there are no open txn in the snapshot private long minOpenTxn = Long.MAX_VALUE; protected long highWatermark; + //todo: handle toString, etc + private long lowWatermark; public ValidReadTxnList() { this(new long[0], Long.MAX_VALUE, Long.MAX_VALUE); @@ -65,7 +67,8 @@ public ValidReadTxnList(String value) { @Override public boolean isTxnValid(long txnid) { - if (highWatermark < txnid) { + //all we need for IOW + if (highWatermark < txnid || txnid < lowWatermark) { return false; } return Arrays.binarySearch(exceptions, txnid) < 0; @@ -77,6 +80,7 @@ public boolean isTxnValid(long txnid) { */ @Override public boolean isValidBase(long txnid) { + //todo: should this consider lowWatermark? return minOpenTxn > txnid && txnid <= highWatermark; } @Override diff --git metastore/src/java/org/apache/hadoop/hive/metastore/LockRequestBuilder.java metastore/src/java/org/apache/hadoop/hive/metastore/LockRequestBuilder.java index 00a1907..0206f70 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/LockRequestBuilder.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/LockRequestBuilder.java @@ -104,11 +104,14 @@ public LockRequestBuilder addLockComponent(LockComponent component) { // requests come in. This structure depends on the fact that null is a // valid key in a LinkedHashMap. So a database lock will map to (dbname, null, // null). + //Will this be an issue if we add support for Serializable Isolation which would require us to + //keep track Read Set (in addition to Write Set). For example, Merge stmt hase multiple + //Insert branches targeting the same table representing different ops: Upd/Del/Ins private static class LockTrie { Map trie; LockTrie() { - trie = new LinkedHashMap(); + trie = new LinkedHashMap<>(); } public void add(LockComponent comp) { diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index d378d06..a84e287 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -975,7 +975,14 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc /** * we know this is part of DP operation and so we'll get * {@link #addDynamicPartitions(AddDynamicPartitions)} call with the list - * of partitions actually chaged. + * of partitions actually changed. + * + * todo: what if we write directly to delta_x (i.e. no move op) but query dies before + * addDynamicPartitions() call but after delta_x was created and had data written to it. + * We won't have anything in TXN_COMPONENTS and the system will think this Aborted + * txn is empty and delete it. so delta_x will now look committed... + * So we'd have to write some file to delta_x when dir is created and delete it + * after addDynamicPartitions() call.... to make this work */ updateTxnComponents = !lc.isIsDynamicPartitionWrite(); } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 0b0df85..2f3577f 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -403,7 +403,7 @@ public void cleanupRecords(HiveObjectType type, Database db, Table table, */ public static interface MutexAPI { /** - * The {@code key} is name of the lock. Will acquire and exclusive lock or block. It retuns + * The {@code key} is name of the lock. Will acquire and exclusive lock or block. It returns * a handle which must be used to release the lock. Each invocation returns a new handle. */ public LockHandle acquireLock(String key) throws MetaException; diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index d981119..1f74da7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -492,6 +492,7 @@ public void run() { // because at that point we need access to the objects. Hive.get().getMSC().flushCache(); + // Do semantic analysis and plan generation if (saHooks != null && !saHooks.isEmpty()) { HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl(); diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 62f7c5a..70cf2b4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -248,7 +248,8 @@ LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isB rqstBuilder.addLockComponent(comp); atLeastOneLock = true; } - + //For Acid & 1/4 Acid we should get rid of SemiShared lock but perhaps it would be useful for + // non-acid tables to ensure there is only 1 insert at a time but with concurrent reads // For each source to write to, get the appropriate lock type. If it's // an OVERWRITE, we need to get an exclusive lock. If it's an insert (no // overwrite) than we need a shared. If it's update or delete then we @@ -264,10 +265,24 @@ LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isB Table t = null; switch (output.getWriteType()) { case DDL_EXCLUSIVE: - case INSERT_OVERWRITE: compBuilder.setExclusive(); compBuilder.setOperationType(DataOperationType.NO_TXN); break; + case INSERT_OVERWRITE: + t = getTable(output); + //todo: add IOW DOT and split in TxnHandler? or create 2 LockComponent, 1 for + //Insert 1 for Delete. 2 LCs will affect locks acquired - they will get collapsed into 1 Delete + //so why bother with 2? Or do we need 2 for proper conflict detection + //DOH - we need TxnHandler to record the txn id so we do need a new IOW DataOperationType! + if(AcidUtils.isAcidTable(t)) { + compBuilder.setSemiShared(); + compBuilder.setOperationType(DataOperationType.DELETE); + } + else { + compBuilder.setExclusive(); + compBuilder.setOperationType(DataOperationType.DELETE); + } + break; case INSERT: t = getTable(output);