diff --git ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java index 2a737bbf8a..fb7047d89d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java +++ ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java @@ -36,18 +36,25 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.LogUtils; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.cli.CommonCliOptions; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PrincipalType; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TableValidWriteIds; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.TransactionalValidationListener; import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.metastore.utils.HiveStrictManagedUtils; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -58,12 +65,13 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; - +import org.apache.hadoop.security.UserGroupInformation; import org.apache.thrift.TException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; + public class HiveStrictManagedMigration { private static final Logger LOG = LoggerFactory.getLogger(HiveStrictManagedMigration.class); @@ -874,6 +882,18 @@ HiveUpdater getHiveUpdater() throws HiveException { return hiveUpdater; } + private static final class TxnCtx { + public final long writeId; + public final String validWriteIds; + public final long txnId; + + public TxnCtx(long writeId, String validWriteIds, long txnId) { + this.writeId = writeId; + this.txnId = txnId; + this.validWriteIds = validWriteIds; + } + } + class HiveUpdater { Hive hive; @@ -897,16 +917,111 @@ void updateDbLocation(Database db, Path newLocation) throws HiveException { hive.alterDatabase(db.getName(), db); } + void updateTableLocation(Table table, Path newLocation) throws HiveException { String msg = String.format("ALTER TABLE %s SET LOCATION '%s'", getQualifiedName(table), newLocation); LOG.info(msg); + boolean isTxn = TxnUtils.isTransactionalTable(table); org.apache.hadoop.hive.ql.metadata.Table modifiedTable = new org.apache.hadoop.hive.ql.metadata.Table(table); modifiedTable.setDataLocation(newLocation); - hive.alterTable(table.getCatName(), table.getDbName(), table.getTableName(), - modifiedTable, false, null, false); + + alterTableInternal(isTxn, table, modifiedTable); + } + + private void alterTableInternal(boolean wasTxn, Table table, + org.apache.hadoop.hive.ql.metadata.Table modifiedTable) throws HiveException { + IMetaStoreClient msc = getMSC(); + TxnCtx txnCtx = generateTxnCtxForAlter(table, msc, wasTxn); + boolean isOk = false; + try { + String validWriteIds = null; + if (txnCtx != null) { + validWriteIds = txnCtx.validWriteIds; + modifiedTable.getTTable().setWriteId(txnCtx.writeId); + } + msc.alter_table(table.getCatName(), table.getDbName(), table.getTableName(), + modifiedTable.getTTable(), null, validWriteIds); + isOk = true; + } catch (TException ex) { + throw new HiveException(ex); + } finally { + closeTxnCtx(txnCtx, msc, isOk); + } + } + + private void alterPartitionInternal(Table table, + org.apache.hadoop.hive.ql.metadata.Partition modifiedPart) throws HiveException { + IMetaStoreClient msc = getMSC(); + TxnCtx txnCtx = generateTxnCtxForAlter(table, msc, null); + boolean isOk = false; + try { + String validWriteIds = null; + if (txnCtx != null) { + validWriteIds = txnCtx.validWriteIds; + modifiedPart.getTPartition().setWriteId(txnCtx.writeId); + } + msc.alter_partition(table.getDbName(), table.getTableName(), + modifiedPart.getTPartition(), null, validWriteIds); + isOk = true; + } catch (TException ex) { + throw new HiveException(ex); + } finally { + closeTxnCtx(txnCtx, msc, isOk); + } + } + + private IMetaStoreClient getMSC() throws HiveException { + try { + return hive.getMSC(); + } catch (MetaException ex) { + throw new HiveException(ex); + } + } + + private TxnCtx generateTxnCtxForAlter( + Table table, IMetaStoreClient msc, Boolean wasTxn) throws HiveException { + if ((wasTxn != null && !wasTxn) || !TxnUtils.isTransactionalTable(table.getParameters())) { + return null; + } + try { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + long txnId = msc.openTxn(ugi == null ? "anonymous" : ugi.getShortUserName()); + TxnCtx result = null; + try { + ValidTxnList txns = msc.getValidTxns(txnId); + String fqn = table.getDbName() + "." + table.getTableName(); + List writeIdsObj = msc.getValidWriteIds( + Lists.newArrayList(fqn), txns.toString()); + String validWriteIds = TxnUtils.createValidTxnWriteIdList(txnId, writeIdsObj) + .getTableValidWriteIdList(fqn).writeToString(); + long writeId = msc.allocateTableWriteId(txnId, table.getDbName(), table.getTableName()); + result = new TxnCtx(writeId, validWriteIds, txnId); + } finally { + if (result == null) { + msc.abortTxns(Lists.newArrayList(txnId)); + } + } + return result; + } catch (IOException | TException ex) { + throw new HiveException(ex); + } + } + + private void closeTxnCtx(TxnCtx txnCtx, IMetaStoreClient msc, boolean isOk) + throws HiveException { + if (txnCtx == null) return; + try { + if (isOk) { + msc.commitTxn(txnCtx.txnId); + } else { + msc.abortTxns(Lists.newArrayList(txnCtx.txnId)); + } + } catch (TException ex) { + throw new HiveException(ex); + } } void updatePartitionLocation(String dbName, Table table, String partName, Partition part, Path newLocation) @@ -920,11 +1035,13 @@ void updatePartitionLocation(String dbName, Table table, String partName, Partit new org.apache.hadoop.hive.ql.metadata.Table(table), part); modifiedPart.setLocation(newLocation.toString()); - hive.alterPartition(dbName, table.getTableName(), modifiedPart, null, false); + alterPartitionInternal(table, modifiedPart); } void updateTableProperties(Table table, Map props) throws HiveException { StringBuilder sb = new StringBuilder(); + boolean isTxn = TxnUtils.isTransactionalTable(table); + org.apache.hadoop.hive.ql.metadata.Table modifiedTable = new org.apache.hadoop.hive.ql.metadata.Table(table); if (props.size() == 0) { @@ -951,8 +1068,9 @@ void updateTableProperties(Table table, Map props) throws HiveEx getQualifiedName(table), sb.toString()); LOG.info(msg); - hive.alterTable(table.getCatName(), table.getDbName(), table.getTableName(), modifiedTable, - false, null, false); + // Note: for now, this is always called to convert the table to either external, or ACID/MM, + // so the original table would be non-txn and the transaction wouldn't be opened. + alterTableInternal(isTxn, table, modifiedTable); } }