diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java index ed46bca..c959222 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java @@ -143,7 +143,9 @@ private RecordUpdater createRecordUpdater(int bucketId, Long minTxnId, Long maxT .inspector(getSerde().getObjectInspector()) .bucket(bucketId) .minimumTransactionId(minTxnId) - .maximumTransactionId(maxTxnID)); + .maximumTransactionId(maxTxnID) + .statementId(-1) + .finalDestination(partitionPath)); } catch (SerDeException e) { throw new SerializationError("Failed to get object inspector from Serde " + getSerde().getClass().getName(), e); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index fd9c275..f4ab0dc 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -381,7 +381,7 @@ public void commitTxn(CommitTxnRequest rqst) LOG.debug("Going to execute insert <" + s + ">"); if (stmt.executeUpdate(s) < 1) { LOG.warn("Expected to move at least one record from txn_components to " + - "completed_txn_components when committing txn!"); + "completed_txn_components when committing txn!");//this can be reasonable for an empty txn START/COMMIT } // Always access TXN_COMPONENTS before HIVE_LOCKS; @@ -1351,7 +1351,7 @@ private LockResponse lock(Connection dbConn, LockRequest rqst, boolean wait) throws NoSuchTxnException, TxnAbortedException, MetaException, SQLException { // We want to minimize the number of concurrent lock requests being issued. If we do not we // get a large number of deadlocks in the database, since this method has to both clean - // timedout locks and insert new locks. This synchronization barrier will not eliminiate all + // timedout locks and insert new locks. This synchronization barrier will not eliminate all // deadlocks, and the code is still resilient in the face of a database deadlock. But it // will reduce the number. This could have been done via a lock table command in the // underlying database, but was not for two reasons. One, different databases have different @@ -1452,7 +1452,7 @@ private LockResponse checkLock(Connection dbConn, long extLockId, boolean alwaysCommit) throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException, SQLException { - List locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId); + List locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId);//being acquired now LockResponse response = new LockResponse(); response.setLockid(extLockId); diff --git ql/src/java/org/apache/hadoop/hive/ql/Context.java ql/src/java/org/apache/hadoop/hive/ql/Context.java index a74bbbe..ca0d487 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -96,7 +96,6 @@ // List of Locks for this query protected List hiveLocks; - protected HiveLockManager hiveLockMgr; // Transaction manager for this query protected HiveTxnManager hiveTxnManager; diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 669e6be..d37fbc1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -385,6 +385,15 @@ public int compile(String command, boolean resetTaskIds) { SessionState.get().setupQueryCurrentTimestamp(); try { + // Initialize the transaction manager. This must be done before analyze is called. Also + // record the valid transactions for this query. We have to do this at compile time + // because we use the information in planning the query. Also, + // we want to record it at this point so that users see data valid at the point that they + // submit the query. + SessionState.get().initTxnMgr(conf); + recordValidTxns();//todo: move this to after lock acquisition of 1st query in txn + //i.e. both should happen in runInternal + command = new VariableSubstitution().substitute(conf,command); ctx = new Context(conf); ctx.setTryCount(getTryCount()); @@ -397,13 +406,6 @@ public int compile(String command, boolean resetTaskIds) { tree = ParseUtils.findRootNonNullToken(tree); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PARSE); - // Initialize the transaction manager. This must be done before analyze is called. Also - // record the valid transactions for this query. We have to do this at compile time - // because we use the information in planning the query. Also, - // we want to record it at this point so that users see data valid at the point that they - // submit the query. - SessionState.get().initTxnMgr(conf); - recordValidTxns(); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ANALYZE); BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(conf, tree); @@ -443,10 +445,8 @@ public int compile(String command, boolean resetTaskIds) { // to avoid returning sensitive data String queryStr = HookUtils.redactLogString(conf, command); - String operationName = ctx.getExplain() ? - HiveOperation.EXPLAIN.getOperationName() : SessionState.get().getCommandType(); plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId, - operationName); + SessionState.get().getHiveOperation()); conf.setVar(HiveConf.ConfVars.HIVEQUERYSTRING, queryStr); @@ -505,7 +505,7 @@ public int compile(String command, boolean resetTaskIds) { downstreamError = e; console.printError(errorMessage, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); - return error.getErrorCode(); + return error.getErrorCode();//todo: this is bad if returned as cmd shell exit } finally { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.COMPILE); dumpMetaCallTimingWithoutEx("compilation"); @@ -940,13 +940,13 @@ private void recordValidTxns() throws LockException { conf.set(ValidTxnList.VALID_TXNS_KEY, txnStr); LOG.debug("Encoding valid txns info " + txnStr); // TODO I think when we switch to cross query transactions we need to keep this list in - // session state rather than agressively encoding it in the conf like this. We can let the + // session state rather than aggressively encoding it in the conf like this. We can let the // TableScanOperators then encode it in the conf before calling the input formats. } /** * Acquire read and write locks needed by the statement. The list of objects to be locked are - * obtained from the inputs and outputs populated by the compiler. The lock acuisition scheme is + * obtained from the inputs and outputs populated by the compiler. The lock acquisition scheme is * pretty simple. If all the locks cannot be obtained, error out. Deadlock is avoided by making * sure that the locks are lexicographically sorted. * @@ -974,25 +974,27 @@ private int acquireLocksAndOpenTxn() { "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); return 10; } - if (acidSinks != null && acidSinks.size() > 0) { + + if((txnMgr.getAutoCommit() && acidSinks != null && acidSinks.size() > 0) || + plan.getOperation() == HiveOperation.START_TRANSACTION) { // We are writing to tables in an ACID compliant way, so we need to open a transaction - long txnId = ss.getCurrentTxn(); - if (txnId == SessionState.NO_CURRENT_TXN) { - txnId = txnMgr.openTxn(userFromUGI); - ss.setCurrentTxn(txnId); - LOG.debug("Setting current transaction to " + txnId); - } - // Set the transaction id in all of the acid file sinks - if (acidSinks != null) { - for (FileSinkDesc desc : acidSinks) { - desc.setTransactionId(txnId); - } + assert !txnMgr.isTxnOpen() : "Trying to open a new txn when is already open"; + if (!txnMgr.isTxnOpen()) { + txnMgr.openTxn(userFromUGI); } // TODO Once we move to cross query transactions we need to add the open transaction to // our list of valid transactions. We don't have a way to do that right now. } + // Set the transaction id in all of the acid file sinks + if (acidSinks != null && !acidSinks.isEmpty() && txnMgr.isTxnOpen()) { + for (FileSinkDesc desc : acidSinks) { + desc.setTransactionId(txnMgr.getCurrentTxnId()); + desc.setStatementId(txnMgr.getStatementId()); + } + } + txnMgr.acquireLocks(plan, ctx, userFromUGI); return 0; @@ -1025,17 +1027,14 @@ private void releaseLocksAndCommitOrRollback(List hiveLocks, boolean c HiveTxnManager txnMgr = ss.getTxnMgr(); // If we've opened a transaction we need to commit or rollback rather than explicitly // releasing the locks. - if (ss.getCurrentTxn() != SessionState.NO_CURRENT_TXN && ss.isAutoCommit()) { - try { - if (commit) { - txnMgr.commitTxn(); - } else { - txnMgr.rollbackTxn(); - } - } finally { - ss.setCurrentTxn(SessionState.NO_CURRENT_TXN); + if (txnMgr.isTxnOpen()) { + if (commit) { + txnMgr.commitTxn();//both commit & rollback clear ALL locks for this tx + } else { + txnMgr.rollbackTxn(); } } else { + //since there is no tx, we only have locks for current query (if any) if (hiveLocks != null) { txnMgr.getLockManager().releaseLocks(hiveLocks); } @@ -1177,12 +1176,50 @@ private CommandProcessorResponse runInternal(String command, boolean alreadyComp // Since we're reusing the compiled plan, we need to update its start time for current run plan.setQueryStartTime(perfLogger.getStartTime(PerfLogger.DRIVER_RUN)); } - // the reason that we set the txn manager for the cxt here is because each // query has its own ctx object. The txn mgr is shared across the // same instance of Driver, which can run multiple queries. - ctx.setHiveTxnManager(SessionState.get().getTxnMgr()); + HiveTxnManager txnManager = SessionState.get().getTxnMgr(); + ctx.setHiveTxnManager(txnManager); + if(txnManager.isTxnOpen() && !plan.getOperation().isAllowedInTransaction()) { + try { + assert !txnManager.getAutoCommit(); + releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), false); + return new CommandProcessorResponse(12, ErrorMsg.OP_NOT_ALLOWED_IN_TXN, null, + plan.getOperationName(), Long.toString(txnManager.getCurrentTxnId())); + } catch (LockException e) { + return handleHiveExcetpion(e, 12); + } + } + if(plan.getOperation().isDisallowedInAutoCommit() && txnManager.getAutoCommit()) { + //nothing to rollback/release since we are in autoCommit mode + return new CommandProcessorResponse(12, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT, null, + plan.getOperation().getOperationName()); + } + if(!txnManager.isTxnOpen() && !txnManager.getAutoCommit()) { + if(!(plan.getOperation() == HiveOperation.SET_AUTOCOMMIT || plan.getOperation() == HiveOperation.START_TRANSACTION)) { + return new CommandProcessorResponse(12, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, + null, plan.getOperationName()); + } + } + if(plan.getOperation() == HiveOperation.SET_AUTOCOMMIT) { + try { + if(plan.getAutoCommitValue() && !txnManager.getAutoCommit()) { + /*if there is an open txn, we want to commit it here; this behavior matches + * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#setAutoCommit(boolean)*/ + releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), true); + txnManager.setAutoCommit(true); + } + else if(!plan.getAutoCommitValue() && txnManager.getAutoCommit()) { + txnManager.setAutoCommit(false); + } + else {/*didn't change autoCommit value - no-op*/} + } + catch(LockException e) { + return handleHiveExcetpion(e, 12); + } + } if (requiresLock()) { ret = acquireLocksAndOpenTxn(); if (ret != 0) { @@ -1207,14 +1244,17 @@ private CommandProcessorResponse runInternal(String command, boolean alreadyComp //if needRequireLock is false, the release here will do nothing because there is no lock try { - releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), true); + if(txnManager.getAutoCommit() || plan.getOperation() == HiveOperation.COMMIT) { + releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), true); + } + else if(plan.getOperation() == HiveOperation.ROLLBACK) { + releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), false); + } + else { + //txn (if there is one started) is not finished + } } catch (LockException e) { - errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e); - SQLState = ErrorMsg.findSQLState(e.getMessage()); - downstreamError = e; - console.printError(errorMessage + "\n" - + org.apache.hadoop.util.StringUtils.stringifyException(e)); - return createProcessorResponse(12); + return handleHiveExcetpion(e, 12); } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DRIVER_RUN); @@ -1237,6 +1277,15 @@ private CommandProcessorResponse runInternal(String command, boolean alreadyComp return createProcessorResponse(ret); } + private CommandProcessorResponse handleHiveExcetpion(HiveException e, int ret) { + errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e); + SQLState = e.getCanonicalErrorMsg() != null ? + e.getCanonicalErrorMsg().getSQLState() : ErrorMsg.findSQLState(e.getMessage()); + downstreamError = e; + console.printError(errorMessage + "\n" + + org.apache.hadoop.util.StringUtils.stringifyException(e)); + return createProcessorResponse(ret); + } private boolean requiresLock() { if (!checkConcurrency()) { return false; diff --git ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java index a0d61f5..593b228 100644 --- ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java +++ ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ColumnAccessInfo; import org.apache.hadoop.hive.ql.parse.TableAccessInfo; +import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReducerTimeStatsPerJob; import org.apache.hadoop.hive.ql.plan.api.AdjacencyType; @@ -104,14 +105,16 @@ private QueryProperties queryProperties; private transient Long queryStartTime; - private String operationName; + private final HiveOperation operation; + private Boolean autoCommitValue; public QueryPlan() { this.reducerTimeStatsPerJobList = new ArrayList(); + operation = null; } public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId, - String operationName) { + HiveOperation operation) { this.queryString = queryString; rootTasks = new ArrayList>(); @@ -132,7 +135,8 @@ public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, S query.putToQueryAttributes("queryString", this.queryString); queryProperties = sem.getQueryProperties(); queryStartTime = startTime; - this.operationName = operationName; + this.operation = operation; + this.autoCommitValue = sem.getAutoCommitValue(); } public String getQueryStr() { @@ -787,6 +791,12 @@ public void setQueryStartTime(Long queryStartTime) { } public String getOperationName() { - return operationName; + return operation == null ? null : operation.getOperationName(); + } + public HiveOperation getOperation() { + return operation; + } + public Boolean getAutoCommitValue() { + return autoCommitValue; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index b07a37a..0a466e4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -380,7 +380,7 @@ public int execute(DriverContext driverContext) { tbd.getHoldDDLTime(), isSkewedStoredAsDirs(tbd), work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID, - SessionState.get().getCurrentTxn()); + SessionState.get().getTxnMgr().getCurrentTxnId()); console.printInfo("\t Time taken for load dynamic partitions : " + (System.currentTimeMillis() - startTime)); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java index e1d2395..c6d891b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java @@ -62,7 +62,7 @@ *
  • New format - *
      *        $partition/base_$tid/$bucket
    - *                   delta_$tid_$tid/$bucket
    + *                   delta_$tid_$tid_$stid/$bucket
      *     
  • * *

    @@ -71,6 +71,8 @@ * stored sorted by the original transaction id (ascending), bucket (ascending), * row id (ascending), and current transaction id (descending). Thus the files * can be merged by advancing through the files in parallel. + * The stid is unique id (within the transaction) of the statement that created + * this delta file. *

    * The base files include all transactions from the beginning of time * (transaction id 0) to the transaction in the directory name. Delta diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java index 0d537e1..dd90a95 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java @@ -39,7 +39,7 @@ /** * Options to control how the files are written */ - public static class Options { + public static class Options implements Cloneable { private final Configuration configuration; private FileSystem fs; private ObjectInspector inspector; @@ -53,7 +53,9 @@ private PrintStream dummyStream = null; private boolean oldStyle = false; private int recIdCol = -1; // Column the record identifier is in, -1 indicates no record id - + //unique within a transaction + private int statementId = 0; + private Path finalDestination; /** * Create the options object. * @param conf Use the given configuration @@ -63,6 +65,18 @@ public Options(Configuration conf) { } /** + * shallow clone + */ + @Override + public Options clone() { + try { + return (Options)super.clone(); + } + catch(CloneNotSupportedException ex) { + throw new RuntimeException("clone() not properly implemented: " + ex.getMessage(), ex); + } + } + /** * Use the given ObjectInspector for each record written. * @param inspector the inspector to use. * @return this @@ -185,6 +199,31 @@ public Options useDummy(PrintStream stream) { return this; } + /** + * @since 1.3.0 + * This can be set to -1 to make the system generate old style (delta_xxxx_yyyy) file names. + * This is primarily needed for testing to make sure 1.3 code can still read files created + * by older code. Also used by Comactor. + */ + public Options statementId(int id) { + if(id >= AcidUtils.MAX_STATEMENTS_PER_TXN) { + throw new RuntimeException("Too many statements for transactionId: " + maximumTransactionId); + } + if(id < -1) { + throw new IllegalArgumentException("Illegal statementId value: " + id); + } + this.statementId = id; + return this; + } + /** + * @param p where the data for this operation will eventually end up; + * basically table or partition directory in FS + */ + public Options finalDestination(Path p) { + this.finalDestination = p; + return this; + } + public Configuration getConfiguration() { return configuration; } @@ -236,6 +275,12 @@ public PrintStream getDummyStream() { boolean getOldStyle() { return oldStyle; } + public int getStatementId() { + return statementId; + } + public Path getFinalDestination() { + return finalDestination; + } } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 2214733..9ea1237 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -67,6 +67,15 @@ public boolean accept(Path path) { }; public static final String BUCKET_DIGITS = "%05d"; public static final String DELTA_DIGITS = "%07d"; + /** + * 10K statements per tx. Probably overkill ... since that many delta files + * would not be good for performance + */ + public static final String STATEMENT_DIGITS = "%04d"; + /** + * This must be in sync with {@link #STATEMENT_DIGITS} + */ + public static final int MAX_STATEMENTS_PER_TXN = 10000; public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$"); public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{5}"); public static final PathFilter originalBucketFilter = new PathFilter() { @@ -79,7 +88,7 @@ public boolean accept(Path path) { private AcidUtils() { // NOT USED } - private static final Log LOG = LogFactory.getLog(AcidUtils.class.getName()); + private static final Log LOG = LogFactory.getLog(AcidUtils.class); private static final Pattern ORIGINAL_PATTERN = Pattern.compile("[0-9]+_[0-9]+"); @@ -104,12 +113,23 @@ public static Path createBucketFile(Path subdir, int bucket) { BUCKET_PREFIX + String.format(BUCKET_DIGITS, bucket)); } - private static String deltaSubdir(long min, long max) { + /** + * This is format of delta dir name prior to Hive 1.3.x + */ + public static String deltaSubdir(long min, long max) { return DELTA_PREFIX + String.format(DELTA_DIGITS, min) + "_" + String.format(DELTA_DIGITS, max); } /** + * Each write statement in a transaction creates its own delta dir. + * @since 1.3.x + */ + public static String deltaSubdir(long min, long max, int statementId) { + return deltaSubdir(min, max) + "_" + String.format(STATEMENT_DIGITS, statementId); + } + + /** * Create a filename for a bucket file. * @param directory the partition directory * @param options the options for writing the bucket @@ -124,9 +144,15 @@ public static Path createFilename(Path directory, } else if (options.isWritingBase()) { subdir = BASE_PREFIX + String.format(DELTA_DIGITS, options.getMaximumTransactionId()); + } else if(options.getStatementId() == -1) { + //when minor compaction runs, we collapse per statement delta files inside a single + //transaction so we no longer need a statementId in the file name + subdir = deltaSubdir(options.getMinimumTransactionId(), + options.getMaximumTransactionId()); } else { subdir = deltaSubdir(options.getMinimumTransactionId(), - options.getMaximumTransactionId()); + options.getMaximumTransactionId(), + options.getStatementId()); } return createBucketFile(new Path(directory, subdir), options.getBucket()); } @@ -214,14 +240,24 @@ static long parseBase(Path path) { } public static class ParsedDelta implements Comparable { - final long minTransaction; - final long maxTransaction; - final FileStatus path; + private final long minTransaction; + private final long maxTransaction; + private final FileStatus path; + //-1 is for internal (getAcidState()) purposes and means the delta dir + //had no statement ID + private final int statementId; + /** + * for pre 1.3.x delta files + */ ParsedDelta(long min, long max, FileStatus path) { + this(min, max, path, -1); + } + ParsedDelta(long min, long max, FileStatus path, int statementId) { this.minTransaction = min; this.maxTransaction = max; this.path = path; + this.statementId = statementId; } public long getMinTransaction() { @@ -236,6 +272,16 @@ public Path getPath() { return path.getPath(); } + public int getStatementId() { + return statementId == -1 ? 0 : statementId; + } + + /** + * Compactions (Major/Minor) merge deltas/bases but delete of old files + * happens in a different process; thus it's possible to have bases/deltas with + * overlapping txnId boundaries. The sort order helps figure out the "best" set of files + * to use to get data. + */ @Override public int compareTo(ParsedDelta parsedDelta) { if (minTransaction != parsedDelta.minTransaction) { @@ -250,7 +296,22 @@ public int compareTo(ParsedDelta parsedDelta) { } else { return -1; } - } else { + } + else if(statementId != parsedDelta.statementId) { + /** + * We want deltas after minor compaction (w/o statementId) to sort + * earlier so that getAcidState() considers compacted files (into larger ones) obsolete + * Before compaction, include deltas with all statementIds for a given txnId + * in a {@link org.apache.hadoop.hive.ql.io.AcidUtils.Directory} + */ + if(statementId < parsedDelta.statementId) { + return -1; + } + else { + return 1; + } + } + else { return path.compareTo(parsedDelta.path); } } @@ -277,40 +338,71 @@ public int compareTo(ParsedDelta parsedDelta) { */ public static List serializeDeltas(List deltas) { List result = new ArrayList(deltas.size() * 2); + ParsedDelta previous = null; for(ParsedDelta delta: deltas) { + if(previous != null && previous.minTransaction == delta.minTransaction && previous.maxTransaction == delta.maxTransaction) { + previous = delta; + continue; + } result.add(delta.minTransaction); result.add(delta.maxTransaction); + previous = delta; } return result; } /** * Convert the list of begin/end transaction id pairs to a list of delta - * directories. + * directories. Note that there may be multiple delta files for the exact same txn range starting + * with 1.3.x; + * see {@link org.apache.hadoop.hive.ql.io.AcidUtils#deltaSubdir(long, long, int)} * @param root the root directory * @param deltas list of begin/end transaction id pairs * @return the list of delta paths */ - public static Path[] deserializeDeltas(Path root, List deltas) { + public static Path[] deserializeDeltas(Path root, final List deltas, Configuration conf) throws IOException { int deltaSize = deltas.size() / 2; - Path[] result = new Path[deltaSize]; + List results = new ArrayList(); + FileSystem fs = root.getFileSystem(conf); for(int i = 0; i < deltaSize; ++i) { - result[i] = new Path(root, deltaSubdir(deltas.get(i * 2), - deltas.get(i * 2 + 1))); + final int index = i; + PathFilter pathFilter = new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().startsWith(deltaSubdir(deltas.get(index * 2), + deltas.get(index * 2 + 1))); + } + }; + List deltaDirs = SHIMS.listLocatedStatus(fs, root, pathFilter); + for (FileStatus deltaDir : deltaDirs) { + results.add(deltaDir.getPath()); + } } - return result; + return results.toArray(new Path[results.size()]); } - static ParsedDelta parseDelta(FileStatus path) { - String filename = path.getPath().getName(); + private static ParsedDelta parseDelta(FileStatus path) { + ParsedDelta p = parsedDelta(path.getPath()); + return new ParsedDelta(p.getMinTransaction(), + p.getMaxTransaction(), path, p.statementId); + } + public static ParsedDelta parsedDelta(Path deltaDir) { + String filename = deltaDir.getName(); if (filename.startsWith(DELTA_PREFIX)) { String rest = filename.substring(DELTA_PREFIX.length()); int split = rest.indexOf('_'); + int split2 = rest.indexOf('_', split + 1);//may be -1 if no statementId long min = Long.parseLong(rest.substring(0, split)); - long max = Long.parseLong(rest.substring(split + 1)); - return new ParsedDelta(min, max, path); + long max = split2 == -1 ? + Long.parseLong(rest.substring(split + 1)) : + Long.parseLong(rest.substring(split + 1, split2)); + if(split2 == -1) { + return new ParsedDelta(min, max, null); + } + int statementId = Integer.parseInt(rest.substring(split2 + 1)); + return new ParsedDelta(min, max, null, statementId); } - throw new IllegalArgumentException(path + " does not start with " + + throw new IllegalArgumentException(deltaDir + " does not start with " + DELTA_PREFIX); } @@ -407,15 +499,24 @@ public static Directory getAcidState(Path directory, Collections.sort(working); long current = bestBaseTxn; + int lastStmtId = -1; for(ParsedDelta next: working) { if (next.maxTransaction > current) { // are any of the new transactions ones that we care about? if (txnList.isTxnRangeValid(current+1, next.maxTransaction) != - ValidTxnList.RangeResponse.NONE) { + ValidTxnList.RangeResponse.NONE) { deltas.add(next); current = next.maxTransaction; + lastStmtId = next.statementId; } - } else { + } + else if(next.maxTransaction == current && lastStmtId >= 0) { + //make sure to get all deltas within a single transaction; multi-statement txn + //generate multiple delta files with the same txnId range + //of course, if maxTransaction has already been minor compacted, all per statement deltas are obsolete + deltas.add(next); + } + else { obsolete.add(next.path); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java index 7ad5aa0..50ba740 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java @@ -297,31 +297,32 @@ public static RecordUpdater getAcidRecordUpdater(JobConf jc, TableDesc tableInfo // TODO not 100% sure about this. This call doesn't set the compression type in the conf // file the way getHiveRecordWriter does, as ORC appears to read the value for itself. Not // sure if this is correct or not. - return getRecordUpdater(jc, acidOutputFormat, conf.getCompressed(), conf.getTransactionId(), - bucket, inspector, tableInfo.getProperties(), outPath, reporter, rowIdColNum); + return getRecordUpdater(jc, acidOutputFormat, + bucket, inspector, tableInfo.getProperties(), outPath, reporter, rowIdColNum, conf); } private static RecordUpdater getRecordUpdater(JobConf jc, AcidOutputFormat acidOutputFormat, - boolean isCompressed, - long txnId, int bucket, ObjectInspector inspector, Properties tableProp, Path outPath, Reporter reporter, - int rowIdColNum) throws IOException { + int rowIdColNum, + FileSinkDesc conf) throws IOException { return acidOutputFormat.getRecordUpdater(outPath, new AcidOutputFormat.Options(jc) - .isCompressed(isCompressed) + .isCompressed(conf.getCompressed()) .tableProperties(tableProp) .reporter(reporter) .writingBase(false) - .minimumTransactionId(txnId) - .maximumTransactionId(txnId) + .minimumTransactionId(conf.getTransactionId()) + .maximumTransactionId(conf.getTransactionId()) .bucket(bucket) .inspector(inspector) - .recordIdColumn(rowIdColNum)); + .recordIdColumn(rowIdColNum) + .statementId(conf.getStatementId()) + .finalDestination(conf.getDestPath())); } public static PartitionDesc getPartitionDescFromPathRecursively( diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 5d6c9da..9d41ab7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -1196,7 +1196,7 @@ public float getProgress() throws IOException { } else { root = path; } - final Path[] deltas = AcidUtils.deserializeDeltas(root, split.getDeltas()); + final Path[] deltas = AcidUtils.deserializeDeltas(root, split.getDeltas(), options.getConfiguration()); final Configuration conf = options.getConfiguration(); final Reader reader; final int bucket; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 728118a..2f11611 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -72,41 +72,55 @@ /** * A RecordIdentifier extended with the current transaction id. This is the * key of our merge sort with the originalTransaction, bucket, and rowId - * ascending and the currentTransaction descending. This means that if the + * ascending and the currentTransaction, statementId descending. This means that if the * reader is collapsing events to just the last update, just the first * instance of each record is required. */ final static class ReaderKey extends RecordIdentifier{ private long currentTransactionId; + private int statementId;//sort on this descending, like currentTransactionId public ReaderKey() { - this(-1, -1, -1, -1); + this(-1, -1, -1, -1, 0); } public ReaderKey(long originalTransaction, int bucket, long rowId, long currentTransactionId) { + this(originalTransaction, bucket, rowId, currentTransactionId, 0); + } + /** + * @param statementId - set this to 0 if N/A + */ + public ReaderKey(long originalTransaction, int bucket, long rowId, + long currentTransactionId, int statementId) { super(originalTransaction, bucket, rowId); this.currentTransactionId = currentTransactionId; + this.statementId = statementId; } @Override public void set(RecordIdentifier other) { super.set(other); currentTransactionId = ((ReaderKey) other).currentTransactionId; + statementId = ((ReaderKey) other).statementId; } public void setValues(long originalTransactionId, int bucket, long rowId, - long currentTransactionId) { + long currentTransactionId, + int statementId) { setValues(originalTransactionId, bucket, rowId); this.currentTransactionId = currentTransactionId; + this.statementId = statementId; } @Override public boolean equals(Object other) { return super.equals(other) && - currentTransactionId == ((ReaderKey) other).currentTransactionId; + currentTransactionId == ((ReaderKey) other).currentTransactionId + && statementId == ((ReaderKey) other).statementId//consistent with compareTo() + ; } @Override @@ -118,6 +132,9 @@ public int compareTo(RecordIdentifier other) { if (currentTransactionId != oth.currentTransactionId) { return currentTransactionId < oth.currentTransactionId ? +1 : -1; } + if(statementId != oth.statementId) { + return statementId < oth.statementId ? +1 : -1; + } } else { return -1; } @@ -125,6 +142,13 @@ public int compareTo(RecordIdentifier other) { return sup; } + /** + * This means 1 txn modified the same row more than once + */ + private boolean isSameRow(ReaderKey other) { + return compareRow(other) == 0 && currentTransactionId == other.currentTransactionId; + } + public long getCurrentTransactionId() { return currentTransactionId; } @@ -142,7 +166,7 @@ public int compareRow(RecordIdentifier other) { public String toString() { return "{originalTxn: " + getTransactionId() + ", bucket: " + getBucketId() + ", row: " + getRowId() + ", currentTxn: " + - currentTransactionId + "}"; + currentTransactionId + ", statementId: "+ statementId + "}"; } } @@ -159,6 +183,7 @@ public String toString() { final ReaderKey key; final RecordIdentifier maxKey; final int bucket; + private final int statementId; /** * Create a reader that reads from the first key larger than minKey to any @@ -170,17 +195,19 @@ public String toString() { * @param maxKey only return keys less than or equal to maxKey if it is * non-null * @param options options to provide to read the rows. + * @param statementId id of SQL statement within a transaction * @throws IOException */ ReaderPair(ReaderKey key, Reader reader, int bucket, RecordIdentifier minKey, RecordIdentifier maxKey, - ReaderImpl.Options options) throws IOException { + ReaderImpl.Options options, int statementId) throws IOException { this.reader = reader; this.key = key; this.maxKey = maxKey; this.bucket = bucket; // TODO use stripe statistics to jump over stripes recordReader = reader.rowsOptions(options); + this.statementId = statementId; // advance the reader until we reach the minimum key do { next(nextRecord); @@ -195,7 +222,8 @@ void next(OrcStruct next) throws IOException { key.setValues(OrcRecordUpdater.getOriginalTransaction(nextRecord), OrcRecordUpdater.getBucket(nextRecord), OrcRecordUpdater.getRowId(nextRecord), - OrcRecordUpdater.getCurrentTransaction(nextRecord)); + OrcRecordUpdater.getCurrentTransaction(nextRecord), + statementId); // if this record is larger than maxKey, we need to stop if (maxKey != null && key.compareRow(maxKey) > 0) { @@ -223,7 +251,7 @@ int getColumns() { OriginalReaderPair(ReaderKey key, Reader reader, int bucket, RecordIdentifier minKey, RecordIdentifier maxKey, Reader.Options options) throws IOException { - super(key, reader, bucket, minKey, maxKey, options); + super(key, reader, bucket, minKey, maxKey, options, 0); } @Override @@ -263,7 +291,7 @@ void next(OrcStruct next) throws IOException { nextRecord.setFieldValue(OrcRecordUpdater.ROW, recordReader.next(OrcRecordUpdater.getRow(next))); } - key.setValues(0L, bucket, nextRowId, 0L); + key.setValues(0L, bucket, nextRowId, 0L, 0); if (maxKey != null && key.compareRow(maxKey) > 0) { if (LOG.isDebugEnabled()) { LOG.debug("key " + key + " > maxkey " + maxKey); @@ -415,7 +443,7 @@ private void discoverKeyBounds(Reader reader, this.offset = options.getOffset(); this.length = options.getLength(); this.validTxnList = validTxnList; - // modify the optins to reflect the event instead of the base row + // modify the options to reflect the event instead of the base row Reader.Options eventOptions = createEventOptions(options); if (reader == null) { baseReader = null; @@ -438,7 +466,7 @@ private void discoverKeyBounds(Reader reader, options); } else { pair = new ReaderPair(key, reader, bucket, minKey, maxKey, - eventOptions); + eventOptions, 0); } // if there is at least one record, put it in the map @@ -458,13 +486,14 @@ private void discoverKeyBounds(Reader reader, for(Path delta: deltaDirectory) { ReaderKey key = new ReaderKey(); Path deltaFile = AcidUtils.createBucketFile(delta, bucket); + AcidUtils.ParsedDelta deltaDir = AcidUtils.parsedDelta(delta); FileSystem fs = deltaFile.getFileSystem(conf); long length = getLastFlushLength(fs, deltaFile); if (length != -1 && fs.exists(deltaFile)) { Reader deltaReader = OrcFile.createReader(deltaFile, OrcFile.readerOptions(conf).maxLength(length)); ReaderPair deltaPair = new ReaderPair(key, deltaReader, bucket, minKey, - maxKey, eventOptions); + maxKey, eventOptions, deltaDir.getStatementId()); if (deltaPair.nextRecord != null) { readers.put(key, deltaPair); } @@ -580,9 +609,18 @@ public boolean next(RecordIdentifier recordIdentifier, continue; } + /*for multi-statement txns, you may have multiple events for the same + * row in the same (current) transaction. We want to collapse these to just the last one + * regardless whether we are minor compacting. Consider INSERT/UPDATE/UPDATE of the + * same row in the same txn. There is no benefit passing along anything except the last + * event. If we did want to pass it along, we'd have to include statementId in the row + * returned so that compaction could write it out or make minor minor compaction understand + * how to write out delta files in delta_xxx_yyy_stid format. There doesn't seem to be any + * value in this.*/ + boolean isSameRow = prevKey.isSameRow((ReaderKey)recordIdentifier); // if we are collapsing, figure out if this is a new row - if (collapse) { - keysSame = prevKey.compareRow(recordIdentifier) == 0; + if (collapse || isSameRow) { + keysSame = (collapse && prevKey.compareRow(recordIdentifier) == 0) || (isSameRow); if (!keysSame) { prevKey.set(recordIdentifier); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index b576496..c42b6c8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -89,6 +89,7 @@ private final IntWritable bucket = new IntWritable(); private final LongWritable rowId = new LongWritable(); private long insertedRows = 0; + private long rowIdOffset = 0; // This records how many rows have been inserted or deleted. It is separate from insertedRows // because that is monotonically increasing to give new unique row ids. private long rowCountDelta = 0; @@ -261,8 +262,53 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { item.setFieldValue(ORIGINAL_TRANSACTION, originalTransaction); item.setFieldValue(BUCKET, bucket); item.setFieldValue(ROW_ID, rowId); + Path matchingBucket = AcidUtils.createFilename(options.getFinalDestination(), options); + if (fs.exists(matchingBucket)) { + //This is a sanity check. If this were to happen, data from previous write in the same + //txn would be lost + String msg = "Writing to " + path + " for minTxnId=" + options.getMinimumTransactionId() + + " maxTxnId=" + options.getMaximumTransactionId() + " stmtId=" + options.getStatementId() + + " but delta with this stmtId already exists: " + matchingBucket; + LOG.error(msg); + throw new IllegalStateException(msg); + } } + /** + * To handle multiple INSERT... statements in a single transaction, we want to make sure + * to generate unique {@code rowId} for all inserted rows of the transaction. + * @return largest rowId created by previous statements (maybe 0) + * @throws IOException + */ + private long findRowIdOffsetForInsert() throws IOException { + /* + * 1. need to know bucket we are writing to + * 2. need to know which delta dir it's in + * Then, + * 1. find the same bucket file in previous delta dir for this txn + * 2. read the footer and get AcidStats which has insert count + * 2.1 if AcidStats.inserts>0 done + * else go to previous delta file + * For example, consider insert/update/insert case...*/ + if(options.getStatementId() <= 0) { + return 0;//there is only 1 statement in this transaction (so far) + } + for(int pastStmt = options.getStatementId() - 1; pastStmt >= 0; pastStmt--) { + Path matchingBucket = AcidUtils.createFilename(options.getFinalDestination(), options.clone().statementId(pastStmt)); + if(!fs.exists(matchingBucket)) { + continue; + } + Reader reader = OrcFile.createReader(matchingBucket, OrcFile.readerOptions(options.getConfiguration())); + //no close() on Reader?! + AcidStats acidStats = parseAcidStats(reader); + if(acidStats.inserts > 0) { + return acidStats.inserts; + } + } + //if we got here, we looked at all delta files in this txn, prior to current statement and didn't + //find any inserts... + return 0; + } // Find the record identifier column (if there) and return a possibly new ObjectInspector that // will strain out the record id for the underlying writer. private ObjectInspector findRecId(ObjectInspector inspector, int rowIdColNum) { @@ -304,6 +350,9 @@ private void addEvent(int operation, long currentTransaction, long rowId, Object recIdInspector.getStructFieldData(rowIdValue, originalTxnField)); rowId = rowIdInspector.get(recIdInspector.getStructFieldData(rowIdValue, rowIdField)); } + else if(operation == INSERT_OPERATION) { + rowId += rowIdOffset; + } this.rowId.set(rowId); this.originalTransaction.set(originalTransaction); item.setFieldValue(OrcRecordUpdater.ROW, (operation == DELETE_OPERATION ? null : row)); @@ -315,6 +364,9 @@ private void addEvent(int operation, long currentTransaction, long rowId, Object public void insert(long currentTransaction, Object row) throws IOException { if (this.currentTransaction.get() != currentTransaction) { insertedRows = 0; + //this method is almost no-op in hcatalog.streaming case since statementId == 0 is + //always true in that case + rowIdOffset = findRowIdOffsetForInsert(); } addEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row); rowCountDelta++; @@ -407,6 +459,22 @@ Writer getWriter() { } return result; } + /** + * {@link KeyIndexBuilder} creates these + */ + static AcidStats parseAcidStats(Reader reader) { + String statsSerialized; + try { + ByteBuffer val = + reader.getMetadataValue(OrcRecordUpdater.ACID_STATS) + .duplicate(); + statsSerialized = utf8Decoder.decode(val).toString(); + } catch (CharacterCodingException e) { + throw new IllegalArgumentException("Bad string encoding for " + + OrcRecordUpdater.ACID_STATS, e); + } + return new AcidStats(statsSerialized); + } static class KeyIndexBuilder implements OrcFile.WriterCallback { StringBuilder lastKey = new StringBuilder(); diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index f8fff1a..bd7f6ec 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -21,7 +21,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.LockComponentBuilder; import org.apache.hadoop.hive.metastore.LockRequestBuilder; @@ -52,6 +51,14 @@ private DbLockManager lockMgr = null; private IMetaStoreClient client = null; private long txnId = 0; + /** + * assigns a unique monotonically increasing ID to each statement + * which is part of an open transaction. This is used by storage + * layer (see {@link org.apache.hadoop.hive.ql.io.AcidUtils#deltaSubdir(long, long, int)}) + * to keep apart multiple writes of the same data within the same transaction + * Also see {@link org.apache.hadoop.hive.ql.io.AcidOutputFormat.Options} + */ + private int statementId = -1; DbTxnManager() { } @@ -67,13 +74,16 @@ void setHiveConf(HiveConf conf) { @Override public long openTxn(String user) throws LockException { init(); + if(isTxnOpen()) { + throw new LockException("Transaction already opened. txnId=" + txnId);//ToDo: ErrorMsg + } try { txnId = client.openTxn(user); + statementId = 0; LOG.debug("Opened txn " + txnId); return txnId; } catch (TException e) { - throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), - e); + throw new LockException(e, ErrorMsg.METASTORE_COMMUNICATION_FAILED); } } @@ -222,7 +232,10 @@ LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isB return null; } - List locks = new ArrayList(1); + List locks = new ArrayList(1); + if(isTxnOpen()) { + statementId++; + } LockState lockState = lockMgr.lock(rqstBuilder.build(), plan.getQueryId(), isBlocking, locks); ctx.setHiveLocks(locks); return lockState; @@ -230,9 +243,8 @@ LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isB @Override public void commitTxn() throws LockException { - if (txnId == 0) { - throw new RuntimeException("Attempt to commit before opening a " + - "transaction"); + if (!isTxnOpen()) { + throw new RuntimeException("Attempt to commit before opening a transaction"); } try { lockMgr.clearLocalLockRecords(); @@ -249,14 +261,14 @@ public void commitTxn() throws LockException { e); } finally { txnId = 0; + statementId = -1; } } @Override public void rollbackTxn() throws LockException { - if (txnId == 0) { - throw new RuntimeException("Attempt to rollback before opening a " + - "transaction"); + if (!isTxnOpen()) { + throw new RuntimeException("Attempt to rollback before opening a transaction"); } try { lockMgr.clearLocalLockRecords(); @@ -270,6 +282,7 @@ public void rollbackTxn() throws LockException { e); } finally { txnId = 0; + statementId = -1; } } @@ -278,7 +291,7 @@ public void heartbeat() throws LockException { LOG.debug("Heartbeating lock and transaction " + txnId); List locks = lockMgr.getLocks(false, false); if (locks.size() == 0) { - if (txnId == 0) { + if (!isTxnOpen()) { // No locks, no txn, we outta here. return; } else { @@ -336,7 +349,7 @@ public boolean supportsAcid() { @Override protected void destruct() { try { - if (txnId > 0) rollbackTxn(); + if (isTxnOpen()) rollbackTxn(); if (lockMgr != null) lockMgr.close(); } catch (Exception e) { LOG.error("Caught exception " + e.getClass().getName() + " with message <" + e.getMessage() @@ -361,5 +374,16 @@ private void init() throws LockException { } } } - + @Override + public boolean isTxnOpen() { + return txnId > 0; + } + @Override + public long getCurrentTxnId() { + return txnId; + } + @Override + public int getStatementId() { + return statementId; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java index 21ab8ee..be5a593 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java @@ -52,8 +52,20 @@ public long openTxn(String user) throws LockException { // No-op return 0L; } + @Override + public boolean isTxnOpen() { + return false; + } + @Override + public long getCurrentTxnId() { + return 0L; + } @Override + public int getStatementId() { + return 0; + } + @Override public HiveLockManager getLockManager() throws LockException { if (lockMgr == null) { boolean supportConcurrency = diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java index 2dd0c7d..69f337a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.lockmgr; import org.apache.hadoop.hive.common.ValidTxnList; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.QueryPlan; @@ -127,4 +126,28 @@ * @return true if this transaction manager does ACID */ boolean supportsAcid(); + + /** + * This behaves exactly as + * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#setAutoCommit(boolean) + */ + void setAutoCommit(boolean autoCommit) throws LockException; + + /** + * This behaves exactly as + * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#getAutoCommit() + */ + boolean getAutoCommit(); + + boolean isTxnOpen(); + /** + * if {@code isTxnOpen()}, returns the currently active transaction ID + */ + long getCurrentTxnId(); + + /** + * 0..N Id of current statement within currently opened transaction + */ + int getStatementId(); + } diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java index eccb8d1..5560ab5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java @@ -27,6 +27,7 @@ abstract class HiveTxnManagerImpl implements HiveTxnManager { protected HiveConf conf; + private boolean isAutoCommit = true;//true by default per JDBC spec void setHiveConf(HiveConf c) { conf = c; @@ -43,5 +44,14 @@ public void closeTxnManager() { protected void finalize() throws Throwable { destruct(); } + @Override + public void setAutoCommit(boolean autoCommit) throws LockException { + isAutoCommit = autoCommit; + } + + @Override + public boolean getAutoCommit() { + return isAutoCommit; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/LockException.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/LockException.java index 9894a70..8ea457e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/LockException.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/LockException.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.lockmgr; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.metadata.HiveException; /** @@ -43,5 +44,10 @@ public LockException(Throwable cause) { public LockException(String message, Throwable cause) { super(message, cause); } - + public LockException(Throwable cause, ErrorMsg errorMsg, String... msgArgs) { + super(cause, errorMsg, msgArgs); + } + public LockException(Throwable cause, ErrorMsg errorMsg) { + super(cause, errorMsg); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveException.java ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveException.java index 1d895ca..d017705 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveException.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveException.java @@ -60,6 +60,9 @@ public HiveException(Throwable cause, ErrorMsg errorMsg, String... msgArgs) { canonicalErrorMsg = errorMsg; } + public HiveException(Throwable cause, ErrorMsg errorMsg) { + this(cause, errorMsg, new String[0]); + } /** * @return {@link ErrorMsg#GENERIC_ERROR} by default */ diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java index d72991f..fbe93f9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java @@ -125,6 +125,19 @@ * Columns accessed by updates */ protected ColumnAccessInfo updateColumnAccessInfo; + /** + * the value of set autocommit true|false + * It's an object to make sure it's {@code null} if the parsed statement is + * not 'set autocommit...' + */ + private Boolean autoCommitValue; + + public Boolean getAutoCommitValue() { + return autoCommitValue; + } + void setAutoCommitValue(Boolean autoCommit) { + autoCommitValue = autoCommit; + } public boolean skipAuthorization() { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g index bdd7cb7..c93d997 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g +++ ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g @@ -306,6 +306,15 @@ KW_DAY: 'DAY'; KW_HOUR: 'HOUR'; KW_MINUTE: 'MINUTE'; KW_SECOND: 'SECOND'; +KW_START_TRANSACTION: 'START TRANSACTION'; +KW_COMMIT: 'COMMIT'; +KW_ROLLBACK: 'ROLLBACK'; +KW_WORK: 'WORK'; +KW_READ_ONLY: 'READ ONLY'; +KW_READ_WRITE: 'READ WRITE'; +KW_ISOLATION_LEVEL: 'ISOLATION LEVEL'; +KW_ISOLATION_SNAPSHOT: 'SNAPSHOT'; +KW_AUTOCOMMIT: 'AUTOCOMMIT'; // Operators // NOTE: if you add a new function/operator, add it to sysFuncNames so that describe function _FUNC_ will work. diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g index 15f1f11..bcc8348 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g +++ ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g @@ -356,6 +356,15 @@ TOK_ANONYMOUS; TOK_COL_NAME; TOK_URI_TYPE; TOK_SERVER_TYPE; +TOK_START_TRANSACTION; +TOK_ISOLATION_LEVEL; +TOK_ISOLATION_SNAPSHOT; +TOK_TXN_ACCESS_MODE; +TOK_TXN_READ_ONLY; +TOK_TXN_READ_WRITE; +TOK_COMMIT; +TOK_ROLLBACK; +TOK_SET_AUTOCOMMIT; } @@ -377,6 +386,7 @@ import org.apache.hadoop.hive.conf.HiveConf; private static HashMap xlateMap; static { + //this is used to support auto completion in CLI xlateMap = new HashMap(); // Keywords @@ -695,6 +705,7 @@ execStatement | ddlStatement | deleteStatement | updateStatement + | sqlTransactionStatement ; loadStatement @@ -2395,3 +2406,61 @@ updateStatement : KW_UPDATE tableName setColumnsClause whereClause? -> ^(TOK_UPDATE_TABLE tableName setColumnsClause whereClause?) ; + +/* +BEGIN user defined transaction boundaries +*/ +sqlTransactionStatement +@init { pushMsg("transaction statement", state); } +@after { popMsg(state); } + : + startTransactionStatement + | commitStatement + | rollbackStatement + | setAutoCommitStatement + ; + +startTransactionStatement + : + KW_START_TRANSACTION ( transactionMode ( COMMA transactionMode )* )? -> ^(TOK_START_TRANSACTION transactionMode*) + ; + +transactionMode + : + isolationLevel + | transactionAccessMode -> ^(TOK_TXN_ACCESS_MODE transactionAccessMode) + ; + +transactionAccessMode + : + KW_READ_ONLY -> TOK_TXN_READ_ONLY + | KW_READ_WRITE -> TOK_TXN_READ_WRITE + ; + +isolationLevel + : + KW_ISOLATION_LEVEL levelOfIsolation -> ^(TOK_ISOLATION_LEVEL levelOfIsolation) + ; + +/*READ UNCOMMITTED | READ COMMITTED | REPEATABLE READ | SERIALIZABLE may be supported later*/ +levelOfIsolation + : + KW_ISOLATION_SNAPSHOT -> TOK_ISOLATION_SNAPSHOT + ; + +commitStatement + : + KW_COMMIT ( KW_WORK )? -> TOK_COMMIT + ; + +rollbackStatement + : + KW_ROLLBACK ( KW_WORK )? -> TOK_ROLLBACK + ; +setAutoCommitStatement + : + KW_SET KW_AUTOCOMMIT booleanValueTok -> ^(TOK_SET_AUTOCOMMIT booleanValueTok) + ; +/* +END user defined transaction boundaries +*/ diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g index 4f8be52..ff7e803 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g +++ ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g @@ -499,6 +499,12 @@ booleanValue KW_TRUE^ | KW_FALSE^ ; +booleanValueTok + : + KW_TRUE -> TOK_TRUE + | KW_FALSE -> TOK_FALSE + ; + tableOrPartition : tableName partitionSpec? -> ^(TOK_TAB tableName partitionSpec?) @@ -629,7 +635,16 @@ nonReserved | KW_STREAMTABLE | KW_STRING | KW_STRUCT | KW_TABLES | KW_TBLPROPERTIES | KW_TEMPORARY | KW_TERMINATED | KW_TINYINT | KW_TOUCH | KW_TRANSACTIONS | KW_UNARCHIVE | KW_UNDO | KW_UNIONTYPE | KW_UNLOCK | KW_UNSET | KW_UNSIGNED | KW_URI | KW_USE | KW_UTC | KW_UTCTIMESTAMP | KW_VALUE_TYPE | KW_VIEW | KW_WHILE | KW_YEAR - ; + | KW_WORK + | KW_START_TRANSACTION + | KW_COMMIT + | KW_ROLLBACK + | KW_READ_ONLY + | KW_READ_WRITE + | KW_ISOLATION_LEVEL + | KW_ISOLATION_SNAPSHOT + | KW_AUTOCOMMIT +; //The following SQL2011 reserved keywords are used as cast function name only, it is a subset of the sql11ReservedKeywordsUsedAsIdentifier. sql11ReservedKeywordsUsedAsCastFunctionName diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 58ee605..54054ab 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -6601,7 +6601,8 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) rsCtx.getNumFiles(), rsCtx.getTotalFiles(), rsCtx.getPartnCols(), - dpCtx); + dpCtx, + dest_path); // If this is an insert, update, or delete on an ACID table then mark that so the // FileSinkOperator knows how to properly write to it. @@ -10046,6 +10047,25 @@ boolean genResolvedParseTree(ASTNode ast, PlannerContext plannerCtx) throws Sema viewsExpanded.add(createVwDesc.getViewName()); } + switch(ast.getToken().getType()) { + case HiveParser.TOK_SET_AUTOCOMMIT: + assert ast.getChildCount() == 1; + if(ast.getChild(0).getType() == HiveParser.TOK_TRUE) { + setAutoCommitValue(true); + } + else if(ast.getChild(0).getType() == HiveParser.TOK_FALSE) { + setAutoCommitValue(false); + } + else { + assert false : "Unexpected child of TOK_SET_AUTOCOMMIT: " + ast.getChild(0).getType(); + } + //fall through + case HiveParser.TOK_START_TRANSACTION: + case HiveParser.TOK_COMMIT: + case HiveParser.TOK_ROLLBACK: + SessionState.get().setCommandType(SemanticAnalyzerFactory.getOperation(ast.getToken().getType())); + return false; + } // 4. continue analyzing from the child ASTNode. Phase1Ctx ctx_1 = initPhase1Ctx(); preProcessForInsert(child, qb); @@ -10156,7 +10176,7 @@ void analyzeInternal(ASTNode ast, PlannerContext plannerCtx) throws SemanticExce } // 6. Generate table access stats if required - if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_TABLEKEYS) == true) { + if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_TABLEKEYS)) { TableAccessAnalyzer tableAccessAnalyzer = new TableAccessAnalyzer(pCtx); setTableAccessInfo(tableAccessAnalyzer.analyzeTableAccess()); } @@ -10179,7 +10199,7 @@ void analyzeInternal(ASTNode ast, PlannerContext plannerCtx) throws SemanticExce boolean isColumnInfoNeedForAuth = SessionState.get().isAuthorizationModeV2() && HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED); if (isColumnInfoNeedForAuth - || HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_SCANCOLS) == true) { + || HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_SCANCOLS)) { ColumnAccessAnalyzer columnAccessAnalyzer = new ColumnAccessAnalyzer(pCtx); setColumnAccessInfo(columnAccessAnalyzer.analyzeColumnAccess()); } @@ -10669,7 +10689,7 @@ public RowResolver getRowResolver(Operator opt) { * Add default properties for table property. If a default parameter exists * in the tblProp, the value in tblProp will be kept. * - * @param table + * @param tblProp * property map * @return Modified table property map */ diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java index 97d02ea..9b8748f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java @@ -59,6 +59,7 @@ commandType.put(HiveParser.TOK_ALTERTABLE_UNARCHIVE, HiveOperation.ALTERTABLE_UNARCHIVE); commandType.put(HiveParser.TOK_ALTERTABLE_PROPERTIES, HiveOperation.ALTERTABLE_PROPERTIES); commandType.put(HiveParser.TOK_ALTERTABLE_DROPPROPERTIES, HiveOperation.ALTERTABLE_PROPERTIES); + commandType.put(HiveParser.TOK_ALTERTABLE_EXCHANGEPARTITION, HiveOperation.ALTERTABLE_EXCHANGEPARTITION); commandType.put(HiveParser.TOK_SHOWDATABASES, HiveOperation.SHOWDATABASES); commandType.put(HiveParser.TOK_SHOWTABLES, HiveOperation.SHOWTABLES); commandType.put(HiveParser.TOK_SHOWCOLUMNS, HiveOperation.SHOWCOLUMNS); @@ -111,6 +112,10 @@ commandType.put(HiveParser.TOK_ALTERTABLE_PARTCOLTYPE, HiveOperation.ALTERTABLE_PARTCOLTYPE); commandType.put(HiveParser.TOK_SHOW_COMPACTIONS, HiveOperation.SHOW_COMPACTIONS); commandType.put(HiveParser.TOK_SHOW_TRANSACTIONS, HiveOperation.SHOW_TRANSACTIONS); + commandType.put(HiveParser.TOK_START_TRANSACTION, HiveOperation.START_TRANSACTION); + commandType.put(HiveParser.TOK_COMMIT, HiveOperation.COMMIT); + commandType.put(HiveParser.TOK_ROLLBACK, HiveOperation.ROLLBACK); + commandType.put(HiveParser.TOK_SET_AUTOCOMMIT, HiveOperation.SET_AUTOCOMMIT); } static { @@ -273,7 +278,11 @@ public static BaseSemanticAnalyzer get(HiveConf conf, ASTNode tree) case HiveParser.TOK_UPDATE_TABLE: case HiveParser.TOK_DELETE_FROM: return new UpdateDeleteSemanticAnalyzer(conf); - + + case HiveParser.TOK_START_TRANSACTION: + case HiveParser.TOK_COMMIT: + case HiveParser.TOK_ROLLBACK: + case HiveParser.TOK_SET_AUTOCOMMIT: default: { SemanticAnalyzer semAnalyzer = HiveConf .getBoolVar(conf, HiveConf.ConfVars.HIVE_CBO_ENABLED) ? new CalcitePlanner(conf) @@ -293,4 +302,7 @@ private static void setSessionCommandType(HiveOperation commandType) { private SemanticAnalyzerFactory() { // prevent instantiation } + static HiveOperation getOperation(int hiveParserToken) { + return commandType.get(hiveParserToken); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index bb6cee5..f73b502 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -92,16 +92,21 @@ // Record what type of write this is. Default is non-ACID (ie old style). private AcidUtils.Operation writeType = AcidUtils.Operation.NOT_ACID; private long txnId = 0; // transaction id for this operation + private int statementId = -1; private transient Table table; + private Path destPath; public FileSinkDesc() { } + /** + * @param destPath - the final destination for data + */ public FileSinkDesc(final Path dirName, final TableDesc tableInfo, final boolean compressed, final int destTableId, final boolean multiFileSpray, final boolean canBeMerged, final int numFiles, final int totalFiles, - final ArrayList partitionCols, final DynamicPartitionCtx dpCtx) { + final ArrayList partitionCols, final DynamicPartitionCtx dpCtx, Path destPath) { this.dirName = dirName; this.tableInfo = tableInfo; @@ -114,6 +119,7 @@ public FileSinkDesc(final Path dirName, final TableDesc tableInfo, this.partitionCols = partitionCols; this.dpCtx = dpCtx; this.dpSortState = DPSortState.NONE; + this.destPath = destPath; } public FileSinkDesc(final Path dirName, final TableDesc tableInfo, @@ -135,7 +141,7 @@ public FileSinkDesc(final Path dirName, final TableDesc tableInfo, public Object clone() throws CloneNotSupportedException { FileSinkDesc ret = new FileSinkDesc(dirName, tableInfo, compressed, destTableId, multiFileSpray, canBeMerged, numFiles, totalFiles, - partitionCols, dpCtx); + partitionCols, dpCtx, destPath); ret.setCompressCodec(compressCodec); ret.setCompressType(compressType); ret.setGatherStats(gatherStats); @@ -231,9 +237,6 @@ public boolean isTemporary() { return temporary; } - /** - * @param totalFiles the totalFiles to set - */ public void setTemporary(boolean temporary) { this.temporary = temporary; } @@ -438,11 +441,23 @@ public void setWriteType(AcidUtils.Operation type) { public void setTransactionId(long id) { txnId = id; } - public long getTransactionId() { return txnId; } + public void setStatementId(int id) { + statementId = id; + } + /** + * See {@link org.apache.hadoop.hive.ql.io.AcidOutputFormat.Options#statementId(int)} + */ + public int getStatementId() { + return statementId; + } + public Path getDestPath() { + return destPath; + } + public Table getTable() { return table; } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java index 75cdf16..a2c60eb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java @@ -102,7 +102,7 @@ CREATETABLE("CREATETABLE", null, new Privilege[]{Privilege.CREATE}), TRUNCATETABLE("TRUNCATETABLE", null, new Privilege[]{Privilege.DROP}), CREATETABLE_AS_SELECT("CREATETABLE_AS_SELECT", new Privilege[]{Privilege.SELECT}, new Privilege[]{Privilege.CREATE}), - QUERY("QUERY", new Privilege[]{Privilege.SELECT}, new Privilege[]{Privilege.ALTER_DATA, Privilege.CREATE}), + QUERY("QUERY", new Privilege[]{Privilege.SELECT}, new Privilege[]{Privilege.ALTER_DATA, Privilege.CREATE}, true, false), ALTERINDEX_PROPS("ALTERINDEX_PROPS",null, null), ALTERDATABASE("ALTERDATABASE", null, null), ALTERDATABASE_OWNER("ALTERDATABASE_OWNER", null, null), @@ -113,11 +113,18 @@ ALTERTBLPART_SKEWED_LOCATION("ALTERTBLPART_SKEWED_LOCATION", new Privilege[] {Privilege.ALTER_DATA}, null), ALTERTABLE_PARTCOLTYPE("ALTERTABLE_PARTCOLTYPE", new Privilege[] { Privilege.SELECT }, new Privilege[] { Privilege.ALTER_DATA }), + ALTERTABLE_EXCHANGEPARTITION("ALTERTABLE_EXCHANGEPARTITION", null, null), ALTERVIEW_RENAME("ALTERVIEW_RENAME", new Privilege[] {Privilege.ALTER_METADATA}, null), ALTERVIEW_AS("ALTERVIEW_AS", new Privilege[] {Privilege.ALTER_METADATA}, null), ALTERTABLE_COMPACT("ALTERTABLE_COMPACT", new Privilege[]{Privilege.SELECT}, new Privilege[]{Privilege.ALTER_DATA}), SHOW_COMPACTIONS("SHOW COMPACTIONS", null, null), - SHOW_TRANSACTIONS("SHOW TRANSACTIONS", null, null); + SHOW_TRANSACTIONS("SHOW TRANSACTIONS", null, null), + + //todo: do these need Privilege? + START_TRANSACTION("START TRANSACTION", null, null, false, true), + COMMIT("COMMIT", null, null, true, true), + ROLLBACK("ROLLBACK", null, null, true, true), + SET_AUTOCOMMIT("SET AUTOCOMMIT", null, null, true, false); ; private String operationName; @@ -126,6 +133,12 @@ private Privilege[] outputRequiredPrivileges; + /** + * Some operations should not be allowed inside an open transactions. For example DDL operations. + */ + private boolean allowedInTransaction = false; + private boolean disallowedInAutoCommit = false; + public Privilege[] getInputRequiredPrivileges() { return inputRequiredPrivileges; } @@ -137,12 +150,26 @@ public String getOperationName() { return operationName; } - + + public boolean isAllowedInTransaction() { + return allowedInTransaction; + } + public boolean isDisallowedInAutoCommit() { + return disallowedInAutoCommit; + } + private HiveOperation(String operationName, - Privilege[] inputRequiredPrivileges, Privilege[] outputRequiredPrivileges) { + Privilege[] inputRequiredPrivileges, Privilege[] outputRequiredPrivileges) { + this(operationName, inputRequiredPrivileges, outputRequiredPrivileges, false, false); + } + private HiveOperation(String operationName, + Privilege[] inputRequiredPrivileges, Privilege[] outputRequiredPrivileges, + boolean allowedInTransaction, boolean disallowedInAutoCommit) { this.operationName = operationName; this.inputRequiredPrivileges = inputRequiredPrivileges; this.outputRequiredPrivileges = outputRequiredPrivileges; + this.allowedInTransaction = allowedInTransaction; + this.disallowedInAutoCommit = disallowedInAutoCommit; } public static class PrivilegeAgreement { diff --git ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java index 4584517..455616f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java +++ ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java @@ -19,16 +19,19 @@ package org.apache.hadoop.hive.ql.processors; import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.hive.ql.ErrorMsg; /** * Encapsulates the basic response info returned by classes the implement the * CommandProcessor interface. Typically errorMessage * and SQLState will only be set if the responseCode - * is not 0. + * is not 0. Note that often {@code responseCode} ends up the exit value of + * command shell process so should keep it to < 127. */ public class CommandProcessorResponse { private final int responseCode; private final String errorMessage; + private final int errorCode; private final String SQLState; private final Schema resSchema; @@ -49,6 +52,10 @@ public CommandProcessorResponse(int responseCode, String errorMessage, String SQ public CommandProcessorResponse(int responseCode, String errorMessage, String SQLState, Schema schema) { this(responseCode, errorMessage, SQLState, schema, null); } + public CommandProcessorResponse(int responseCode, ErrorMsg canonicalErrMsg, Throwable t, String ... msgArgs) { + this(responseCode, canonicalErrMsg.format(msgArgs), + canonicalErrMsg.getSQLState(), null, t, canonicalErrMsg.getErrorCode()); + } /** * Create CommandProcessorResponse object indicating an error. @@ -63,12 +70,17 @@ public static CommandProcessorResponse create(Exception e) { } public CommandProcessorResponse(int responseCode, String errorMessage, String SQLState, - Schema schema, Throwable exception) { + Schema schema, Throwable exception) { + this(responseCode, errorMessage, SQLState, schema, exception, -1); + } + public CommandProcessorResponse(int responseCode, String errorMessage, String SQLState, + Schema schema, Throwable exception, int errorCode) { this.responseCode = responseCode; this.errorMessage = errorMessage; this.SQLState = SQLState; this.resSchema = schema; this.exception = exception; + this.errorCode = errorCode; } public int getResponseCode() { return responseCode; } @@ -76,8 +88,11 @@ public CommandProcessorResponse(int responseCode, String errorMessage, String SQ public String getSQLState() { return SQLState; } public Schema getSchema() { return resSchema; } public Throwable getException() { return exception; } + public int getErrorCode() { return errorCode; } public String toString() { - return "(" + responseCode + "," + errorMessage + "," + SQLState + + return "(" + responseCode + "," + errorMessage + "," + + (errorCode > 0 ? errorCode + "," : "" ) + + SQLState + (resSchema == null ? "" : ",") + (exception == null ? "" : exception.getMessage()) + ")"; } diff --git ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java index 319a79b..c8c9831 100644 --- ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java +++ ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java @@ -75,6 +75,9 @@ public static HiveCommand find(String[] command, boolean findOnlyForTesting) { } else if(command.length > 1 && "from".equalsIgnoreCase(command[1])) { //special handling for SQL "delete from where..." return null; + } + else if(command.length > 1 && "set".equalsIgnoreCase(command[0]) && "autocommit".equalsIgnoreCase(command[1])) { + return null;//don't want set autocommit true|false to get mixed with set hive.foo.bar... } else if (COMMANDS.contains(cmd)) { HiveCommand hiveCommand = HiveCommand.valueOf(cmd); diff --git ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/HiveOperationType.java ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/HiveOperationType.java index b974b59..71be469 100644 --- ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/HiveOperationType.java +++ ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/HiveOperationType.java @@ -125,6 +125,11 @@ ADD, DELETE, COMPILE, + START_TRANSACTION, + COMMIT, + ROLLBACK, + SET_AUTOCOMMIT, + ALTERTABLE_EXCHANGEPARTITION, // ==== Hive command operations ends here ==== // // ==== HiveServer2 metadata api types start here ==== // diff --git ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/Operation2Privilege.java ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/Operation2Privilege.java index a6226b6..8e61d57 100644 --- ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/Operation2Privilege.java +++ ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/Operation2Privilege.java @@ -400,6 +400,17 @@ public HivePrivilegeObjectType getObjectType() { op2Priv.put(HiveOperationType.GET_COLUMNS, PrivRequirement.newIOPrivRequirement(SEL_NOGRANT_AR, null)); + op2Priv.put(HiveOperationType.START_TRANSACTION, PrivRequirement.newIOPrivRequirement + (null, null)); + op2Priv.put(HiveOperationType.COMMIT, PrivRequirement.newIOPrivRequirement + (null, null)); + op2Priv.put(HiveOperationType.ROLLBACK, PrivRequirement.newIOPrivRequirement + (null, null)); + op2Priv.put(HiveOperationType.SET_AUTOCOMMIT, PrivRequirement.newIOPrivRequirement + (null, null)); + op2Priv.put(HiveOperationType.ALTERTABLE_EXCHANGEPARTITION, + PrivRequirement.newIOPrivRequirement(null, null)); + } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 37d856c..a849847 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -244,23 +244,6 @@ private HiveTxnManager txnMgr = null; /** - * When {@link #setCurrentTxn(long)} is set to this or {@link #getCurrentTxn()}} returns this it - * indicates that there is not a current transaction in this session. - */ - public static final long NO_CURRENT_TXN = -1L; - - /** - * Transaction currently open - */ - private long currentTxn = NO_CURRENT_TXN; - - /** - * Whether we are in auto-commit state or not. Currently we are always in auto-commit, - * so there are not setters for this yet. - */ - private final boolean txnAutoCommit = true; - - /** * store the jars loaded last time */ private final Set preReloadableAuxJars = new HashSet(); @@ -401,18 +384,6 @@ public HiveTxnManager getTxnMgr() { return txnMgr; } - public long getCurrentTxn() { - return currentTxn; - } - - public void setCurrentTxn(long currTxn) { - currentTxn = currTxn; - } - - public boolean isAutoCommit() { - return txnAutoCommit; - } - public HadoopShims.HdfsEncryptionShim getHdfsEncryptionShim() throws HiveException { if (hdfsEncryptionShim == null) { try { diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index c5f2d4d..6c77ba4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -545,7 +545,9 @@ private void getWriter(Reporter reporter, ObjectInspector inspector, .reporter(reporter) .minimumTransactionId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE)) .maximumTransactionId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE)) - .bucket(bucket); + .bucket(bucket) + .statementId(-1);//setting statementId == -1 makes compacted delta files use + //delta_xxxx_yyyy format // Instantiate the underlying output format @SuppressWarnings("unchecked")//since there is no way to parametrize instance of Class diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java new file mode 100644 index 0000000..5a0f171 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -0,0 +1,454 @@ +package org.apache.hadoop.hive.ql; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.orc.FileDump; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; + +/** + * The LockManager is not ready, but for no-concurrency straight-line path we can + * test AC=true, and AC=false with commit/rollback/exception and test resulting data. + * + * Can also test, calling commit in AC=true mode, etc, toggling AC... + */ +public class TestTxnCommands { + private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + + File.separator + TestTxnCommands.class.getCanonicalName() + + "-" + System.currentTimeMillis() + ).getPath().replaceAll("\\\\", "/"); + private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse"; + //bucket count for test tables; set it to 1 for easier debugging + private static int BUCKET_COUNT = 2; + @Rule + public TestName testName = new TestName(); + private HiveConf hiveConf; + private Driver d; + private static enum Table { + ACIDTBL("acidTbl"), + ACIDTBL2("acidTbl2"), + NONACIDORCTBL("nonAcidOrcTbl"), + NONACIDORCTBL2("nonAcidOrcTbl2"); + + private final String name; + @Override + public String toString() { + return name; + } + Table(String name) { + this.name = name; + } + } + + @Before + public void setUp() throws Exception { + hiveConf = new HiveConf(this.getClass()); + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR); + TxnDbUtil.setConfValues(hiveConf); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVEENFORCEBUCKETING, true); + TxnDbUtil.prepDb(); + File f = new File(TEST_WAREHOUSE_DIR); + if (f.exists()) { + FileUtil.fullyDelete(f); + } + if (!(new File(TEST_WAREHOUSE_DIR).mkdirs())) { + throw new RuntimeException("Could not create " + TEST_WAREHOUSE_DIR); + } + SessionState.start(new SessionState(hiveConf)); + d = new Driver(hiveConf); + dropTables(); + runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')"); + runStatementOnDriver("create table " + Table.NONACIDORCTBL2 + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')"); + runStatementOnDriver("create temporary table " + Table.ACIDTBL2 + "(a int, b int, c int) clustered by (c) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + } + private void dropTables() throws Exception { + for(Table t : Table.values()) { + runStatementOnDriver("drop table if exists " + t); + } + } + @After + public void tearDown() throws Exception { + try { + if (d != null) { + runStatementOnDriver("set autocommit true"); + dropTables(); + d.destroy(); + d.close(); + d = null; + TxnDbUtil.cleanDb(); + } + } finally { + FileUtils.deleteDirectory(new File(TEST_DATA_DIR)); + } + } + @Test + public void testInsertOverwrite() throws Exception { + runStatementOnDriver("insert overwrite table " + Table.NONACIDORCTBL + " select a,b from " + Table.NONACIDORCTBL2); + runStatementOnDriver("create table " + Table.NONACIDORCTBL2 + "3(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')"); + + } + @Ignore("not needed but useful for testing") + @Test + public void testNonAcidInsert() throws Exception { + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); + List rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(2,3)"); + List rs1 = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + } + + /** + * Useful for debugging. Dumps ORC file in JSON to CWD. + */ + private void dumpBucketData(Table table, long txnId, int stmtId, int bucketNum) throws Exception { + if(true) { + return; + } + Path bucket = AcidUtils.createBucketFile(new Path(new Path(TEST_WAREHOUSE_DIR, table.toString().toLowerCase()), AcidUtils.deltaSubdir(txnId, txnId, stmtId)), bucketNum); + FileOutputStream delta = new FileOutputStream(testName.getMethodName() + "_" + bucket.getParent().getName() + "_" + bucket.getName()); +// try { +// FileDump.printJsonData(hiveConf, bucket.toString(), delta); +// } +// catch(FileNotFoundException ex) { + ;//this happens if you change BUCKET_COUNT +// } + delta.close(); + } + /** + * Dump all data in the table by bucket in JSON format + */ + private void dumpTableData(Table table, long txnId, int stmtId) throws Exception { + for(int bucketNum = 0; bucketNum < BUCKET_COUNT; bucketNum++) { + dumpBucketData(table, txnId, stmtId, bucketNum); + } + } + @Test + public void testSimpleAcidInsert() throws Exception { + int[][] rows1 = {{1,2},{3,4}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1)); + //List rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + //Assert.assertEquals("Data didn't match in autocommit=true (rs)", stringifyValues(rows1), rs); + runStatementOnDriver("set autocommit false"); + runStatementOnDriver("START TRANSACTION"); + int[][] rows2 = {{5,6},{7,8}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows2)); + List allData = stringifyValues(rows1); + allData.addAll(stringifyValues(rows2)); + List rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Data didn't match inside tx (rs0)", allData, rs0); + runStatementOnDriver("COMMIT WORK"); + dumpTableData(Table.ACIDTBL, 1, 0); + dumpTableData(Table.ACIDTBL, 2, 0); + CommandProcessorResponse cpr = runStatementOnDriverNegative("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Error didn't match: " + cpr, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN.getErrorCode(), cpr.getErrorCode()); + runStatementOnDriver("set autocommit true"); + List rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Data didn't match inside tx (rs0)", allData, rs1); + } + @Test + public void testErrors() throws Exception { + runStatementOnDriver("set autocommit true"); + CommandProcessorResponse cpr = runStatementOnDriverNegative("start transaction"); + Assert.assertEquals("Error didn't match: " + cpr, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT.getErrorCode(), cpr.getErrorCode()); + runStatementOnDriver("set autocommit false"); + runStatementOnDriver("start transaction"); + CommandProcessorResponse cpr2 = runStatementOnDriverNegative("create table foo(x int, y int)"); + Assert.assertEquals("Expected DDL to fail in an open txn", ErrorMsg.OP_NOT_ALLOWED_IN_TXN.getErrorCode(), cpr2.getErrorCode()); + runStatementOnDriver("set autocommit true"); + CommandProcessorResponse cpr3 = runStatementOnDriverNegative("update " + Table.ACIDTBL + " set a = 1 where b != 1"); + Assert.assertEquals("Expected update of bucket column to fail", + "FAILED: SemanticException [Error 10302]: Updating values of bucketing columns is not supported. Column a.", + cpr3.getErrorMessage()); + //line below should in principle work but Driver doesn't propagate errorCode properly + //Assert.assertEquals("Expected update of bucket column to fail", ErrorMsg.UPDATE_CANNOT_UPDATE_BUCKET_VALUE.getErrorCode(), cpr3.getErrorCode()); + } + @Test + public void testReadMyOwnInsert() throws Exception { + runStatementOnDriver("set autocommit false"); + runStatementOnDriver("START TRANSACTION"); + List rs = runStatementOnDriver("select * from " + Table.ACIDTBL); + Assert.assertEquals("Expected empty " + Table.ACIDTBL, 0, rs.size()); + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)"); + List rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Can't see my own write", 1, rs0.size()); + runStatementOnDriver("commit"); + runStatementOnDriver("START TRANSACTION"); + List rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + runStatementOnDriver("rollback work"); + Assert.assertEquals("Can't see write after commit", 1, rs1.size()); + } + @Test + public void testImplicitRollback() throws Exception { + runStatementOnDriver("set autocommit false"); + runStatementOnDriver("START TRANSACTION"); + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)"); + List rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Can't see my own write", 1, rs0.size()); + //next command should produce an error + CommandProcessorResponse cpr = runStatementOnDriverNegative("select * from no_such_table"); + Assert.assertEquals("Txn didn't fail?", + "FAILED: SemanticException [Error 10001]: Line 1:14 Table not found 'no_such_table'", + cpr.getErrorMessage()); + runStatementOnDriver("start transaction"); + List rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + runStatementOnDriver("commit"); + Assert.assertEquals("Didn't rollback as expected", 0, rs1.size()); + } + @Test + public void testExplicitRollback() throws Exception { + runStatementOnDriver("set autocommit false"); + runStatementOnDriver("START TRANSACTION"); + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)"); + runStatementOnDriver("ROLLBACK"); + runStatementOnDriver("set autocommit true"); + List rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Rollback didn't rollback", 0, rs.size()); + } + + /** + * This doesn't work due to a bug in storage layer (https://hortonworks.jira.com/browse/RMP-3309) + * 2nd insert clobbers results of 1st insert + * @throws Exception + */ + @Test + public void testMultipleInserts() throws Exception { + runStatementOnDriver("set autocommit false"); + runStatementOnDriver("START TRANSACTION"); + int[][] rows1 = {{1,2},{3,4}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1)); + int[][] rows2 = {{5,6},{7,8}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows2)); + List allData = stringifyValues(rows1); + allData.addAll(stringifyValues(rows2)); + List rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Content didn't match before commit rs", allData, rs); + runStatementOnDriver("commit"); + dumpTableData(Table.ACIDTBL, 1, 0); + dumpTableData(Table.ACIDTBL, 1, 1); + runStatementOnDriver("set autocommit true"); + List rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Content didn't match after commit rs1", allData, rs1); + } + @Test + public void testDelete() throws Exception { + int[][] rows1 = {{1,2},{3,4}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1)); + List rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Content didn't match rs0", stringifyValues(rows1), rs0); + runStatementOnDriver("set autocommit false"); + runStatementOnDriver("START TRANSACTION"); + runStatementOnDriver("delete from " + Table.ACIDTBL + " where b = 4"); + int[][] updatedData2 = {{1,2}}; + List rs3 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Wrong data after delete", stringifyValues(updatedData2), rs3); + runStatementOnDriver("commit"); + runStatementOnDriver("set autocommit true"); + List rs4 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Wrong data after commit", stringifyValues(updatedData2), rs4); + } + + @Test + public void testUpdateOfInserts() throws Exception { + int[][] rows1 = {{1,2},{3,4}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1)); + List rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Content didn't match rs0", stringifyValues(rows1), rs0); + runStatementOnDriver("set autocommit false"); + runStatementOnDriver("START TRANSACTION"); + int[][] rows2 = {{5,6},{7,8}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows2)); + List rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + List allData = stringifyValues(rows1); + allData.addAll(stringifyValues(rows2)); + Assert.assertEquals("Content didn't match rs1", allData, rs1); + runStatementOnDriver("update " + Table.ACIDTBL + " set b = 1 where b != 1"); + int[][] updatedData = {{1,1},{3,1},{5,1},{7,1}}; + List rs2 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Wrong data after update", stringifyValues(updatedData), rs2); + runStatementOnDriver("commit"); + runStatementOnDriver("set autocommit true"); + List rs4 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Wrong data after commit", stringifyValues(updatedData), rs4); + } + @Test + public void testUpdateDeleteOfInserts() throws Exception { + int[][] rows1 = {{1,2},{3,4}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1)); + List rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Content didn't match rs0", stringifyValues(rows1), rs0); + runStatementOnDriver("set autocommit false"); + runStatementOnDriver("START TRANSACTION"); + int[][] rows2 = {{5,6},{7,8}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows2)); + List rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + List allData = stringifyValues(rows1); + allData.addAll(stringifyValues(rows2)); + Assert.assertEquals("Content didn't match rs1", allData, rs1); + runStatementOnDriver("update " + Table.ACIDTBL + " set b = 1 where b != 1"); + int[][] updatedData = {{1,1},{3,1},{5,1},{7,1}}; + List rs2 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Wrong data after update", stringifyValues(updatedData), rs2); + runStatementOnDriver("delete from " + Table.ACIDTBL + " where a = 7 and b = 1"); + dumpTableData(Table.ACIDTBL, 1, 0); + dumpTableData(Table.ACIDTBL, 2, 0); + dumpTableData(Table.ACIDTBL, 2, 2); + dumpTableData(Table.ACIDTBL, 2, 4); + int[][] updatedData2 = {{1,1},{3,1},{5,1}}; + List rs3 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Wrong data after delete", stringifyValues(updatedData2), rs3); + runStatementOnDriver("commit"); + runStatementOnDriver("set autocommit true"); + List rs4 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Wrong data after commit", stringifyValues(updatedData2), rs4); + } + @Test + public void testMultipleDelete() throws Exception { + int[][] rows1 = {{1,2},{3,4},{5,6},{7,8}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1)); + List rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Content didn't match rs0", stringifyValues(rows1), rs0); + runStatementOnDriver("set autocommit false"); + runStatementOnDriver("START TRANSACTION"); + runStatementOnDriver("delete from " + Table.ACIDTBL + " where b = 8"); + int[][] updatedData2 = {{1,2},{3,4},{5,6}}; + List rs2 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Wrong data after delete", stringifyValues(updatedData2), rs2); + runStatementOnDriver("delete from " + Table.ACIDTBL + " where b = 4"); + int[][] updatedData3 = {{1, 2}, {5, 6}}; + List rs3 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Wrong data after delete2", stringifyValues(updatedData3), rs3); + runStatementOnDriver("update " + Table.ACIDTBL + " set b=3"); + dumpTableData(Table.ACIDTBL, 1, 0); + //nothing actually hashes to bucket0, so update/delete deltas don't have it + dumpTableData(Table.ACIDTBL, 2, 0); + dumpTableData(Table.ACIDTBL, 2, 2); + dumpTableData(Table.ACIDTBL, 2, 4); + List rs5 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + int [][] updatedData4 = {{1,3},{5,3}}; + Assert.assertEquals("Wrong data after delete", stringifyValues(updatedData4), rs5); + runStatementOnDriver("commit"); + runStatementOnDriver("set autocommit true"); + List rs4 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Wrong data after commit", stringifyValues(updatedData4), rs4); + } +// @Ignore("https://hortonworks.jira.com/browse/BUG-33229") + @Test + public void testDeleteIn() throws Exception { + runStatementOnDriver("delete from " + Table.ACIDTBL + " where a IN (SELECT A.a from " + + Table.ACIDTBL + " A)"); + int[][] tableData = {{1,2},{3,2},{5,2},{1,3},{3,3},{5,3}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData)); + runStatementOnDriver("insert into " + Table.ACIDTBL2 + "(a,b,c) values(1,7,17),(3,7,17)"); +// runStatementOnDriver("select b from " + Table.ACIDTBL + " where a in (select b from " + Table.NONACIDORCTBL + ")"); + runStatementOnDriver("delete from " + Table.ACIDTBL + " where a in(select a from " + Table.ACIDTBL2 + ")"); +// runStatementOnDriver("delete from " + Table.ACIDTBL + " where a in(select a from " + Table.NONACIDORCTBL + ")"); + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) select a,b from " + Table.ACIDTBL2); + List rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + int[][] updatedData = {{1,7},{3,7},{5,2},{5,3}}; + Assert.assertEquals("Bulk update failed", stringifyValues(updatedData), rs); + } + + /** + * takes raw data and turns it into a string as if from Driver.getResults() + * sorts rows in dictionary order + */ + private List stringifyValues(int[][] rowsIn) { + assert rowsIn.length > 0; + int[][] rows = rowsIn.clone(); + Arrays.sort(rows, new RowComp()); + List rs = new ArrayList(); + for(int[] row : rows) { + assert row.length > 0; + StringBuilder sb = new StringBuilder(); + for(int value : row) { + sb.append(value).append("\t"); + } + sb.setLength(sb.length() - 1); + rs.add(sb.toString()); + } + return rs; + } + private static final class RowComp implements Comparator { + public int compare(int[] row1, int[] row2) { + assert row1 != null && row2 != null && row1.length == row2.length; + for(int i = 0; i < row1.length; i++) { + int comp = Integer.compare(row1[i], row2[i]); + if(comp != 0) { + return comp; + } + } + return 0; + } + } + private String makeValuesClause(int[][] rows) { + assert rows.length > 0; + StringBuilder sb = new StringBuilder("values"); + for(int[] row : rows) { + assert row.length > 0; + if(row.length > 1) { + sb.append("("); + } + for(int value : row) { + sb.append(value).append(","); + } + sb.setLength(sb.length() - 1);//remove trailing comma + if(row.length > 1) { + sb.append(")"); + } + sb.append(","); + } + sb.setLength(sb.length() - 1);//remove trailing comma + return sb.toString(); + } + + private List runStatementOnDriver(String stmt) throws Exception { + CommandProcessorResponse cpr = d.run(stmt); + if(cpr.getResponseCode() != 0) { + throw new RuntimeException(stmt + " failed: " + cpr); + } + List rs = new ArrayList(); + d.getResults(rs); + return rs; + } + private CommandProcessorResponse runStatementOnDriverNegative(String stmt) throws Exception { + CommandProcessorResponse cpr = d.run(stmt); + if(cpr.getResponseCode() != 0) { + return cpr; + } + throw new RuntimeException("Didn't get expected failure!"); + } + + @Ignore + @Test + public void exchangePartition() throws Exception { + runStatementOnDriver("create database ex1"); + runStatementOnDriver("create database ex2"); + + runStatementOnDriver("CREATE TABLE ex1.exchange_part_test1 (f1 string) PARTITIONED BY (ds STRING)"); + runStatementOnDriver("CREATE TABLE ex2.exchange_part_test2 (f1 string) PARTITIONED BY (ds STRING)"); + runStatementOnDriver("ALTER TABLE ex2.exchange_part_test2 ADD PARTITION (ds='2013-04-05')"); + runStatementOnDriver("ALTER TABLE ex1.exchange_part_test1 EXCHANGE PARTITION (ds='2013-04-05') WITH TABLE ex2.exchange_part_test2"); + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java index e400778..c6ae030 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java @@ -303,7 +303,8 @@ private FileSinkOperator getFileSink(AcidUtils.Operation writeType, Map partColNames = new HashMap(1); partColNames.put(PARTCOL_NAME, PARTCOL_NAME); dpCtx.setInputToDPCols(partColNames); - desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx); + //todo: does this need the finalDestination? + desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx, null); } else { desc = new FileSinkDesc(basePath, tableDesc, false); } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java index 1e3df34..f8ded12 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java @@ -46,17 +46,23 @@ public void testCreateFilename() throws Exception { AcidUtils.createFilename(p, options).toString()); options.bucket(123); assertEquals("/tmp/00123_0", - AcidUtils.createFilename(p, options).toString()); + AcidUtils.createFilename(p, options).toString()); options.bucket(23) .minimumTransactionId(100) .maximumTransactionId(200) .writingBase(true) .setOldStyle(false); assertEquals("/tmp/base_0000200/bucket_00023", - AcidUtils.createFilename(p, options).toString()); + AcidUtils.createFilename(p, options).toString()); options.writingBase(false); + assertEquals("/tmp/delta_0000100_0000200_0000/bucket_00023", + AcidUtils.createFilename(p, options).toString()); + options.statementId(-1); assertEquals("/tmp/delta_0000100_0000200/bucket_00023", - AcidUtils.createFilename(p, options).toString()); + AcidUtils.createFilename(p, options).toString()); + options.statementId(7); + assertEquals("/tmp/delta_0000100_0000200_0007/bucket_00023", + AcidUtils.createFilename(p, options).toString()); } @Test @@ -236,7 +242,6 @@ public void testOverlapingDelta() throws Exception { new MockFile("mock:/tbl/part1/delta_40_60/bucket_0", 500, new byte[0]), new MockFile("mock:/tbl/part1/delta_0060_60/bucket_0", 500, new byte[0]), new MockFile("mock:/tbl/part1/delta_052_55/bucket_0", 500, new byte[0]), - new MockFile("mock:/tbl/part1/delta_40_60/bucket_0", 500, new byte[0]), new MockFile("mock:/tbl/part1/base_50/bucket_0", 500, new byte[0])); Path part = new MockPath(fs, "mock:/tbl/part1"); AcidUtils.Directory dir = @@ -254,6 +259,45 @@ public void testOverlapingDelta() throws Exception { assertEquals("mock:/tbl/part1/delta_0000063_63", delts.get(3).getPath().toString()); } + /** + * Hive 1.3.0 delta dir naming scheme which supports multi-statement txns + * @throws Exception + */ + @Test + public void testOverlapingDelta2() throws Exception { + Configuration conf = new Configuration(); + MockFileSystem fs = new MockFileSystem(conf, + new MockFile("mock:/tbl/part1/delta_0000063_63_0/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_000062_62_0/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_000062_62_3/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_00061_61_0/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_40_60/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_0060_60_1/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_0060_60_4/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_0060_60_7/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_052_55/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_058_58/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/base_50/bucket_0", 500, new byte[0])); + Path part = new MockPath(fs, "mock:/tbl/part1"); + AcidUtils.Directory dir = + AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:")); + assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString()); + List obsolete = dir.getObsolete(); + assertEquals(5, obsolete.size()); + assertEquals("mock:/tbl/part1/delta_052_55", obsolete.get(0).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_058_58", obsolete.get(1).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_0060_60_1", obsolete.get(2).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_0060_60_4", obsolete.get(3).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_0060_60_7", obsolete.get(4).getPath().toString()); + List delts = dir.getCurrentDirectories(); + assertEquals(5, delts.size()); + assertEquals("mock:/tbl/part1/delta_40_60", delts.get(0).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_00061_61_0", delts.get(1).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_000062_62_0", delts.get(2).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_000062_62_3", delts.get(3).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_0000063_63_0", delts.get(4).getPath().toString()); + } + @Test public void deltasWithOpenTxnInRead() throws Exception { Configuration conf = new Configuration(); @@ -268,6 +312,27 @@ public void deltasWithOpenTxnInRead() throws Exception { assertEquals("mock:/tbl/part1/delta_2_5", delts.get(1).getPath().toString()); } + /** + * @since 1.3.0 + * @throws Exception + */ + @Test + public void deltasWithOpenTxnInRead2() throws Exception { + Configuration conf = new Configuration(); + MockFileSystem fs = new MockFileSystem(conf, + new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_2_5/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_4_4_1/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_4_4_3/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_101_101_1/bucket_0", 500, new byte[0])); + Path part = new MockPath(fs, "mock:/tbl/part1"); + AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:4")); + List delts = dir.getCurrentDirectories(); + assertEquals(2, delts.size()); + assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_2_5", delts.get(1).getPath().toString()); + } + @Test public void deltasWithOpenTxnsNotInCompact() throws Exception { Configuration conf = new Configuration(); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 0246cd5..7671d43 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -1408,7 +1408,7 @@ public void testVectorizationWithAcid() throws Exception { Path partDir = new Path(conf.get("mapred.input.dir")); OrcRecordUpdater writer = new OrcRecordUpdater(partDir, new AcidOutputFormat.Options(conf).maximumTransactionId(10) - .writingBase(true).bucket(0).inspector(inspector)); + .writingBase(true).bucket(0).inspector(inspector).finalDestination(partDir)); for(int i=0; i < 100; ++i) { BigRow row = new BigRow(i); writer.insert(10, row); @@ -1559,7 +1559,7 @@ public void testCombinationInputFormatWithAcid() throws Exception { // write a base file in partition 0 OrcRecordUpdater writer = new OrcRecordUpdater(partDir[0], new AcidOutputFormat.Options(conf).maximumTransactionId(10) - .writingBase(true).bucket(0).inspector(inspector)); + .writingBase(true).bucket(0).inspector(inspector).finalDestination(partDir[0])); for(int i=0; i < 10; ++i) { writer.insert(10, new MyRow(i, 2 * i)); } @@ -1572,7 +1572,7 @@ public void testCombinationInputFormatWithAcid() throws Exception { // write a delta file in partition 0 writer = new OrcRecordUpdater(partDir[0], new AcidOutputFormat.Options(conf).maximumTransactionId(10) - .writingBase(true).bucket(1).inspector(inspector)); + .writingBase(true).bucket(1).inspector(inspector).finalDestination(partDir[0])); for(int i=10; i < 20; ++i) { writer.insert(10, new MyRow(i, 2*i)); } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java index 921e954..39f71f1 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java @@ -62,12 +62,12 @@ public class TestOrcRawRecordMerger { private static final Log LOG = LogFactory.getLog(TestOrcRawRecordMerger.class); - +//todo: why is statementId -1? @Test public void testOrdering() throws Exception { ReaderKey left = new ReaderKey(100, 200, 1200, 300); ReaderKey right = new ReaderKey(); - right.setValues(100, 200, 1000, 200); + right.setValues(100, 200, 1000, 200,1); assertTrue(right.compareTo(left) < 0); assertTrue(left.compareTo(right) > 0); assertEquals(false, left.equals(right)); @@ -76,16 +76,16 @@ public void testOrdering() throws Exception { assertEquals(true, right.equals(left)); right.setRowId(2000); assertTrue(right.compareTo(left) > 0); - left.setValues(1, 2, 3, 4); - right.setValues(100, 2, 3, 4); + left.setValues(1, 2, 3, 4,-1); + right.setValues(100, 2, 3, 4,-1); assertTrue(left.compareTo(right) < 0); assertTrue(right.compareTo(left) > 0); - left.setValues(1, 2, 3, 4); - right.setValues(1, 100, 3, 4); + left.setValues(1, 2, 3, 4,-1); + right.setValues(1, 100, 3, 4,-1); assertTrue(left.compareTo(right) < 0); assertTrue(right.compareTo(left) > 0); - left.setValues(1, 2, 3, 100); - right.setValues(1, 2, 3, 4); + left.setValues(1, 2, 3, 100,-1); + right.setValues(1, 2, 3, 4,-1); assertTrue(left.compareTo(right) < 0); assertTrue(right.compareTo(left) > 0); @@ -177,7 +177,7 @@ public void testReaderPair() throws Exception { RecordIdentifier minKey = new RecordIdentifier(10, 20, 30); RecordIdentifier maxKey = new RecordIdentifier(40, 50, 60); ReaderPair pair = new ReaderPair(key, reader, 20, minKey, maxKey, - new Reader.Options()); + new Reader.Options(), 0); RecordReader recordReader = pair.recordReader; assertEquals(10, key.getTransactionId()); assertEquals(20, key.getBucketId()); @@ -203,7 +203,7 @@ public void testReaderPairNoMin() throws Exception { Reader reader = createMockReader(); ReaderPair pair = new ReaderPair(key, reader, 20, null, null, - new Reader.Options()); + new Reader.Options(), 0); RecordReader recordReader = pair.recordReader; assertEquals(10, key.getTransactionId()); assertEquals(20, key.getBucketId()); @@ -489,7 +489,7 @@ public void testEmpty() throws Exception { // write the empty base AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) .inspector(inspector).bucket(BUCKET).writingBase(true) - .maximumTransactionId(100); + .maximumTransactionId(100).finalDestination(root); of.getRecordUpdater(root, options).close(false); ValidTxnList txnList = new ValidReadTxnList("200:"); @@ -515,6 +515,10 @@ public void testEmpty() throws Exception { */ @Test public void testNewBaseAndDelta() throws Exception { + testNewBaseAndDelta(false); + testNewBaseAndDelta(true); + } + private void testNewBaseAndDelta(boolean use130Format) throws Exception { final int BUCKET = 10; String[] values = new String[]{"first", "second", "third", "fourth", "fifth", "sixth", "seventh", "eighth", @@ -532,7 +536,10 @@ public void testNewBaseAndDelta() throws Exception { // write the base AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) - .inspector(inspector).bucket(BUCKET); + .inspector(inspector).bucket(BUCKET).finalDestination(root); + if(!use130Format) { + options.statementId(-1); + } RecordUpdater ru = of.getRecordUpdater(root, options.writingBase(true).maximumTransactionId(100)); for(String v: values) { @@ -554,7 +561,8 @@ public void testNewBaseAndDelta() throws Exception { AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, txnList); assertEquals(new Path(root, "base_0000100"), directory.getBaseDirectory()); - assertEquals(new Path(root, "delta_0000200_0000200"), + assertEquals(new Path(root, use130Format ? + AcidUtils.deltaSubdir(200,200,0) : AcidUtils.deltaSubdir(200,200)), directory.getCurrentDirectories().get(0).getPath()); Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(), @@ -829,7 +837,7 @@ synchronized void addedRow() throws IOException { // write a delta AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) .writingBase(false).minimumTransactionId(1).maximumTransactionId(1) - .bucket(BUCKET).inspector(inspector).filesystem(fs).recordIdColumn(5); + .bucket(BUCKET).inspector(inspector).filesystem(fs).recordIdColumn(5).finalDestination(root); RecordUpdater ru = of.getRecordUpdater(root, options); values = new String[]{"0.0", null, null, "1.1", null, null, null, "ignore.7"}; @@ -920,6 +928,7 @@ synchronized void addedRow() throws IOException { options.orcOptions(OrcFile.writerOptions(conf) .stripeSize(1).blockPadding(false).compress(CompressionKind.NONE) .memory(mgr)); + options.finalDestination(root); RecordUpdater ru = of.getRecordUpdater(root, options); String[] values= new String[]{"ignore.1", "0.1", "ignore.2", "ignore.3", "2.0", "2.1", "3.0", "ignore.4", "ignore.5", "ignore.6"}; @@ -1004,7 +1013,8 @@ public void testRecordReaderDelta() throws Exception { AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) .bucket(BUCKET).inspector(inspector).filesystem(fs) - .writingBase(false).minimumTransactionId(1).maximumTransactionId(1); + .writingBase(false).minimumTransactionId(1).maximumTransactionId(1) + .finalDestination(root); RecordUpdater ru = of.getRecordUpdater(root, options); String[] values = new String[]{"a", "b", "c", "d", "e"}; for(int i=0; i < values.length; ++i) { @@ -1047,6 +1057,14 @@ public void testRecordReaderDelta() throws Exception { */ @Test public void testRecordReaderIncompleteDelta() throws Exception { + testRecordReaderIncompleteDelta(false); + testRecordReaderIncompleteDelta(true); + } + /** + * + * @param use130Format true means use delta_0001_0001_0000 format, else delta_0001_00001 + */ + private void testRecordReaderIncompleteDelta(boolean use130Format) throws Exception { final int BUCKET = 1; Configuration conf = new Configuration(); OrcOutputFormat of = new OrcOutputFormat(); @@ -1063,7 +1081,10 @@ public void testRecordReaderIncompleteDelta() throws Exception { AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) .writingBase(true).minimumTransactionId(0).maximumTransactionId(0) - .bucket(BUCKET).inspector(inspector).filesystem(fs); + .bucket(BUCKET).inspector(inspector).filesystem(fs).finalDestination(root); + if(!use130Format) { + options.statementId(-1); + } RecordUpdater ru = of.getRecordUpdater(root, options); String[] values= new String[]{"1", "2", "3", "4", "5"}; for(int i=0; i < values.length; ++i) { @@ -1110,8 +1131,8 @@ public void testRecordReaderIncompleteDelta() throws Exception { splits = inf.getSplits(job, 1); assertEquals(2, splits.length); rr = inf.getRecordReader(splits[0], job, Reporter.NULL); - Path sideFile = new Path(root + - "/delta_0000010_0000019/bucket_00001_flush_length"); + Path sideFile = new Path(root + "/" + (use130Format ? AcidUtils.deltaSubdir(10,19,0) : + AcidUtils.deltaSubdir(10,19)) + "/bucket_00001_flush_length"); assertEquals(true, fs.exists(sideFile)); assertEquals(24, fs.getFileStatus(sideFile).getLen()); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java index 22bd4b9..22030b4 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java @@ -97,7 +97,8 @@ public void testWriter() throws Exception { .minimumTransactionId(10) .maximumTransactionId(19) .inspector(inspector) - .reporter(Reporter.NULL); + .reporter(Reporter.NULL) + .finalDestination(root); RecordUpdater updater = new OrcRecordUpdater(root, options); updater.insert(11, new MyRow("first")); updater.insert(11, new MyRow("second")); @@ -197,7 +198,8 @@ public void testUpdates() throws Exception { .maximumTransactionId(100) .inspector(inspector) .reporter(Reporter.NULL) - .recordIdColumn(1); + .recordIdColumn(1) + .finalDestination(root); RecordUpdater updater = new OrcRecordUpdater(root, options); updater.update(100, new MyRow("update", 30, 10, bucket)); updater.delete(100, new MyRow("", 60, 40, bucket)); diff --git ql/src/test/org/apache/hadoop/hive/ql/parse/positive/TestTransactionStatement.java ql/src/test/org/apache/hadoop/hive/ql/parse/positive/TestTransactionStatement.java new file mode 100644 index 0000000..b7f8263 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/parse/positive/TestTransactionStatement.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.positive; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.parse.ASTNode; +import org.apache.hadoop.hive.ql.parse.ParseDriver; +import org.apache.hadoop.hive.ql.parse.ParseException; +import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; + +/** + * Basic parser tests for multi-statement transactions + */ +public class TestTransactionStatement { + private static SessionState sessionState; + private ParseDriver pd; + + @BeforeClass + public static void initialize() { + HiveConf conf = new HiveConf(SemanticAnalyzer.class); + sessionState = SessionState.start(conf); + } + @AfterClass + public static void cleanUp() throws IOException { + if(sessionState != null) { + sessionState.close(); + } + } + + @Before + public void setup() throws SemanticException { + pd = new ParseDriver(); + } + + ASTNode parse(String query) throws ParseException { + ASTNode nd = pd.parse(query); + return (ASTNode) nd.getChild(0); + } + @Test + public void testTxnStart() throws ParseException { + ASTNode ast = parse("START TRANSACTION"); + Assert.assertEquals("AST doesn't match", + "TOK_START_TRANSACTION", ast.toStringTree()); + + ast = parse("START TRANSACTION ISOLATION LEVEL SNAPSHOT"); + Assert.assertEquals("AST doesn't match", + "(TOK_START_TRANSACTION (TOK_ISOLATION_LEVEL TOK_ISOLATION_SNAPSHOT))", ast.toStringTree()); + + ast = parse("START TRANSACTION READ ONLY"); + Assert.assertEquals("AST doesn't match", + "(TOK_START_TRANSACTION (TOK_TXN_ACCESS_MODE TOK_TXN_READ_ONLY))", ast.toStringTree()); + + ast = parse("START TRANSACTION READ WRITE, ISOLATION LEVEL SNAPSHOT"); + Assert.assertEquals("AST doesn't match", + "(TOK_START_TRANSACTION (TOK_TXN_ACCESS_MODE TOK_TXN_READ_WRITE) (TOK_ISOLATION_LEVEL TOK_ISOLATION_SNAPSHOT))", ast.toStringTree()); + + } + @Test + public void testTxnCommitRollback() throws ParseException { + ASTNode ast = parse("COMMIT"); + Assert.assertEquals("AST doesn't match", "TOK_COMMIT", ast.toStringTree()); + ast = parse("COMMIT WORK"); + Assert.assertEquals("AST doesn't match", "TOK_COMMIT", ast.toStringTree()); + ast = parse("ROLLBACK"); + Assert.assertEquals("AST doesn't match", "TOK_ROLLBACK", ast.toStringTree()); + ast = parse("ROLLBACK WORK"); + Assert.assertEquals("AST doesn't match", "TOK_ROLLBACK", ast.toStringTree()); + } + + @Test + public void testAutoCommit() throws ParseException { + ASTNode ast = parse("SET AUTOCOMMIT TRUE"); + Assert.assertEquals("AST doesn't match", "(TOK_SET_AUTOCOMMIT TOK_TRUE)", ast.toStringTree()); + ast = parse("SET AUTOCOMMIT FALSE"); + Assert.assertEquals("AST doesn't match", "(TOK_SET_AUTOCOMMIT TOK_FALSE)", ast.toStringTree()); + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java index 671e122..21adc9d 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java @@ -241,7 +241,7 @@ private StorageDescriptor newStorageDescriptor(String location, List sort return sd; } - // I can't do this with @Before because I want to be able to control when the thead starts + // I can't do this with @Before because I want to be able to control when the thread starts private void startThread(char type, boolean stopAfterOne) throws Exception { startThread(type, stopAfterOne, new AtomicBoolean()); } @@ -284,7 +284,7 @@ private void addFile(Table t, Partition p, long minTxn, long maxTxn, switch (type) { case BASE: filename = "base_" + maxTxn; break; case LENGTH_FILE: // Fall through to delta - case DELTA: filename = "delta_" + minTxn + "_" + maxTxn; break; + case DELTA: filename = makeDeltaDirName(minTxn, maxTxn); break; case LEGACY: break; // handled below } @@ -508,5 +508,21 @@ public void close(boolean abort) throws IOException { } } + /** + * in Hive 1.3.0 delta file names changed to delta_xxxx_yyyy_zzzz; prior to that + * the name was delta_xxxx_yyyy. We want to run compaction tests such that both formats + * are used since new (1.3) code has to be able to read old files. + */ + abstract boolean useHive130DeltaDirName(); + String makeDeltaDirName(long minTxnId, long maxTxnId) { + return useHive130DeltaDirName() ? + AcidUtils.deltaSubdir(minTxnId, maxTxnId, 0) : AcidUtils.deltaSubdir(minTxnId, maxTxnId); + } + /** + * delta dir name after compaction + */ + String makeDeltaDirNameCompacted(long minTxnId, long maxTxnId) { + return AcidUtils.deltaSubdir(minTxnId, maxTxnId); + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java index ffdbb9a..0db732c 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java @@ -139,7 +139,7 @@ public void cleanupAfterMinorTableCompaction() throws Exception { boolean sawBase = false, sawDelta = false; for (Path p : paths) { if (p.getName().equals("base_20")) sawBase = true; - else if (p.getName().equals("delta_21_24")) sawDelta = true; + else if (p.getName().equals(makeDeltaDirName(21, 24))) sawDelta = true; else Assert.fail("Unexpected file " + p.getName()); } Assert.assertTrue(sawBase); @@ -177,7 +177,7 @@ public void cleanupAfterMinorPartitionCompaction() throws Exception { boolean sawBase = false, sawDelta = false; for (Path path : paths) { if (path.getName().equals("base_20")) sawBase = true; - else if (path.getName().equals("delta_21_24")) sawDelta = true; + else if (path.getName().equals(makeDeltaDirNameCompacted(21, 24))) sawDelta = true; else Assert.fail("Unexpected file " + path.getName()); } Assert.assertTrue(sawBase); @@ -480,4 +480,8 @@ public void droppedPartition() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); Assert.assertEquals(0, rsp.getCompactsSize()); } + @Override + boolean useHive130DeltaDirName() { + return false; + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java new file mode 100644 index 0000000..c637dd1 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java @@ -0,0 +1,14 @@ +package org.apache.hadoop.hive.ql.txn.compactor; + +/** + * Same as TestCleaner but tests delta file names in Hive 1.3.0 format + */ +public class TestCleaner2 extends TestCleaner { + public TestCleaner2() throws Exception { + super(); + } + @Override + boolean useHive130DeltaDirName() { + return false; + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java index 00b13de..0b0b1da 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java @@ -713,5 +713,9 @@ public void dropPartition() throws Exception { List compacts = rsp.getCompacts(); Assert.assertEquals(0, compacts.size()); } + @Override + boolean useHive130DeltaDirName() { + return false; + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java index bebac54..11e5333 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java @@ -281,7 +281,7 @@ public void minorTableWithBase() throws Exception { // Find the new delta file and make sure it has the right contents boolean sawNewDelta = false; for (int i = 0; i < stat.length; i++) { - if (stat[i].getPath().getName().equals("delta_0000021_0000024")) { + if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 24))) { sawNewDelta = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath()); Assert.assertEquals(2, buckets.length); @@ -296,6 +296,10 @@ public void minorTableWithBase() throws Exception { Assert.assertTrue(sawNewDelta); } + /** + * todo: fix https://issues.apache.org/jira/browse/HIVE-9995 + * @throws Exception + */ @Test public void minorWithOpenInMiddle() throws Exception { LOG.debug("Starting minorWithOpenInMiddle"); @@ -321,15 +325,18 @@ public void minorWithOpenInMiddle() throws Exception { // There should still now be 5 directories in the location FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); - Assert.assertEquals(5, stat.length); + boolean is130 = this instanceof TestWorker2; + Assert.assertEquals(is130 ? 5 : 4, stat.length); // Find the new delta file and make sure it has the right contents Arrays.sort(stat); Assert.assertEquals("base_20", stat[0].getPath().getName()); - Assert.assertEquals("delta_0000021_0000022", stat[1].getPath().getName()); - Assert.assertEquals("delta_21_22", stat[2].getPath().getName()); - Assert.assertEquals("delta_23_25", stat[3].getPath().getName()); - Assert.assertEquals("delta_26_27", stat[4].getPath().getName()); + if(is130) {//in1.3.0 orig delta is delta_00021_00022_0000 and compacted one is delta_00021_00022... + Assert.assertEquals(makeDeltaDirNameCompacted(21, 22), stat[1].getPath().getName()); + } + Assert.assertEquals(makeDeltaDirName(21, 22), stat[1 + (is130 ? 1 : 0)].getPath().getName()); + Assert.assertEquals(makeDeltaDirName(23, 25), stat[2 + (is130 ? 1 : 0)].getPath().getName()); + Assert.assertEquals(makeDeltaDirName(26, 27), stat[3 + (is130 ? 1 : 0)].getPath().getName()); } @Test @@ -362,10 +369,10 @@ public void minorWithAborted() throws Exception { // Find the new delta file and make sure it has the right contents Arrays.sort(stat); Assert.assertEquals("base_20", stat[0].getPath().getName()); - Assert.assertEquals("delta_0000021_0000027", stat[1].getPath().getName()); - Assert.assertEquals("delta_21_22", stat[2].getPath().getName()); - Assert.assertEquals("delta_23_25", stat[3].getPath().getName()); - Assert.assertEquals("delta_26_27", stat[4].getPath().getName()); + Assert.assertEquals(makeDeltaDirName(21, 22), stat[1].getPath().getName()); + Assert.assertEquals(makeDeltaDirNameCompacted(21, 27), stat[2].getPath().getName()); + Assert.assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName()); + Assert.assertEquals(makeDeltaDirName(26, 27), stat[4].getPath().getName()); } @Test @@ -398,7 +405,7 @@ public void minorPartitionWithBase() throws Exception { // Find the new delta file and make sure it has the right contents boolean sawNewDelta = false; for (int i = 0; i < stat.length; i++) { - if (stat[i].getPath().getName().equals("delta_0000021_0000024")) { + if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 24))) { sawNewDelta = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath()); Assert.assertEquals(2, buckets.length); @@ -441,7 +448,7 @@ public void minorTableNoBase() throws Exception { // Find the new delta file and make sure it has the right contents boolean sawNewDelta = false; for (int i = 0; i < stat.length; i++) { - if (stat[i].getPath().getName().equals("delta_0000001_0000004")) { + if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(1, 4))) { sawNewDelta = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath()); Assert.assertEquals(2, buckets.length); @@ -661,7 +668,7 @@ public void minorTableLegacy() throws Exception { // Find the new delta file and make sure it has the right contents boolean sawNewDelta = false; for (int i = 0; i < stat.length; i++) { - if (stat[i].getPath().getName().equals("delta_0000021_0000024")) { + if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 24))) { sawNewDelta = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath()); Assert.assertEquals(2, buckets.length); @@ -760,9 +767,9 @@ public void majorWithOpenInMiddle() throws Exception { Arrays.sort(stat); Assert.assertEquals("base_0000022", stat[0].getPath().getName()); Assert.assertEquals("base_20", stat[1].getPath().getName()); - Assert.assertEquals("delta_21_22", stat[2].getPath().getName()); - Assert.assertEquals("delta_23_25", stat[3].getPath().getName()); - Assert.assertEquals("delta_26_27", stat[4].getPath().getName()); + Assert.assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName()); + Assert.assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName()); + Assert.assertEquals(makeDeltaDirName(26, 27), stat[4].getPath().getName()); } @Test @@ -796,9 +803,13 @@ public void majorWithAborted() throws Exception { Arrays.sort(stat); Assert.assertEquals("base_0000027", stat[0].getPath().getName()); Assert.assertEquals("base_20", stat[1].getPath().getName()); - Assert.assertEquals("delta_21_22", stat[2].getPath().getName()); - Assert.assertEquals("delta_23_25", stat[3].getPath().getName()); - Assert.assertEquals("delta_26_27", stat[4].getPath().getName()); + Assert.assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName()); + Assert.assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName()); + Assert.assertEquals(makeDeltaDirName(26, 27), stat[4].getPath().getName()); + } + @Override + boolean useHive130DeltaDirName() { + return false; } @Test diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker2.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker2.java new file mode 100644 index 0000000..3b5283a --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker2.java @@ -0,0 +1,16 @@ +package org.apache.hadoop.hive.ql.txn.compactor; + +/** + * Same as TestWorker but tests delta file names in Hive 1.3.0 format + */ +public class TestWorker2 extends TestWorker { + + public TestWorker2() throws Exception { + super(); + } + + @Override + boolean useHive130DeltaDirName() { + return true; + } +} diff --git ql/src/test/results/clientpositive/exchange_partition.q.out ql/src/test/results/clientpositive/exchange_partition.q.out index 4ff1f6c..5b21eaf 100644 --- ql/src/test/results/clientpositive/exchange_partition.q.out +++ ql/src/test/results/clientpositive/exchange_partition.q.out @@ -59,9 +59,9 @@ POSTHOOK: type: SHOWPARTITIONS POSTHOOK: Input: ex2@exchange_part_test2 ds=2013-04-05 PREHOOK: query: ALTER TABLE ex1.exchange_part_test1 EXCHANGE PARTITION (ds='2013-04-05') WITH TABLE ex2.exchange_part_test2 -PREHOOK: type: null +PREHOOK: type: ALTERTABLE_EXCHANGEPARTITION POSTHOOK: query: ALTER TABLE ex1.exchange_part_test1 EXCHANGE PARTITION (ds='2013-04-05') WITH TABLE ex2.exchange_part_test2 -POSTHOOK: type: null +POSTHOOK: type: ALTERTABLE_EXCHANGEPARTITION PREHOOK: query: SHOW PARTITIONS ex1.exchange_part_test1 PREHOOK: type: SHOWPARTITIONS PREHOOK: Input: ex1@exchange_part_test1 diff --git ql/src/test/results/clientpositive/exchange_partition2.q.out ql/src/test/results/clientpositive/exchange_partition2.q.out index d47fb05..8c7c583 100644 --- ql/src/test/results/clientpositive/exchange_partition2.q.out +++ ql/src/test/results/clientpositive/exchange_partition2.q.out @@ -47,9 +47,9 @@ POSTHOOK: type: SHOWPARTITIONS POSTHOOK: Input: default@exchange_part_test2 ds=2013-04-05/hr=1 PREHOOK: query: ALTER TABLE exchange_part_test1 EXCHANGE PARTITION (ds='2013-04-05', hr='1') WITH TABLE exchange_part_test2 -PREHOOK: type: null +PREHOOK: type: ALTERTABLE_EXCHANGEPARTITION POSTHOOK: query: ALTER TABLE exchange_part_test1 EXCHANGE PARTITION (ds='2013-04-05', hr='1') WITH TABLE exchange_part_test2 -POSTHOOK: type: null +POSTHOOK: type: ALTERTABLE_EXCHANGEPARTITION PREHOOK: query: SHOW PARTITIONS exchange_part_test1 PREHOOK: type: SHOWPARTITIONS PREHOOK: Input: default@exchange_part_test1 diff --git ql/src/test/results/clientpositive/exchange_partition3.q.out ql/src/test/results/clientpositive/exchange_partition3.q.out index 3133ad7..3815861 100644 --- ql/src/test/results/clientpositive/exchange_partition3.q.out +++ ql/src/test/results/clientpositive/exchange_partition3.q.out @@ -64,10 +64,10 @@ ds=2013-04-05/hr=1 ds=2013-04-05/hr=2 PREHOOK: query: -- This will exchange both partitions hr=1 and hr=2 ALTER TABLE exchange_part_test1 EXCHANGE PARTITION (ds='2013-04-05') WITH TABLE exchange_part_test2 -PREHOOK: type: null +PREHOOK: type: ALTERTABLE_EXCHANGEPARTITION POSTHOOK: query: -- This will exchange both partitions hr=1 and hr=2 ALTER TABLE exchange_part_test1 EXCHANGE PARTITION (ds='2013-04-05') WITH TABLE exchange_part_test2 -POSTHOOK: type: null +POSTHOOK: type: ALTERTABLE_EXCHANGEPARTITION PREHOOK: query: SHOW PARTITIONS exchange_part_test1 PREHOOK: type: SHOWPARTITIONS PREHOOK: Input: default@exchange_part_test1