diff --git metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java index 29d8da8082..b392e4e4ab 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java @@ -17,11 +17,17 @@ */ package org.apache.hadoop.hive.metastore; +import java.io.IOException; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; @@ -125,9 +131,10 @@ private void handleAlterTableTransactionalProp(PreAlterTableEvent context) throw } if (newTable.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) { - throw new MetaException(newTable.getDbName() + "." + newTable.getTableName() + + throw new MetaException(getTableName(newTable) + " cannot be declared transactional because it's an external table"); } + validateTableStructure(context.getHandler().getMS(), newTable); hasValidTransactionalValue = true; } @@ -297,4 +304,54 @@ private String validateTransactionalProperties(String transactionalProperties) { } return null; // All checks passed, return null. } + private final Pattern ORIGINAL_PATTERN = Pattern.compile("[0-9]+_[0-9]+"); + /** + * @see org.apache.hadoop.hive.ql.exec.Utilities#COPY_KEYWORD + */ + private static final Pattern ORIGINAL_PATTERN_COPY = + Pattern.compile("[0-9]+_[0-9]+" + "_copy_" + "[0-9]+"); + + /** + * It's assumed everywhere that original data files are named according to + * {@link #ORIGINAL_PATTERN} or{@link #ORIGINAL_PATTERN_COPY} + * This checks that when transaction=true is set and throws if it finds any files that don't + * follow convention. + */ + private void validateTableStructure(RawStore ms, Table table) throws MetaException { + Path tablePath; + try { + Warehouse wh = new Warehouse(getConf()); + if (table.getSd().getLocation() == null || table.getSd().getLocation().isEmpty()) { + tablePath = wh.getDefaultTablePath(ms.getDatabase(table.getDbName()), table.getTableName()); + } else { + tablePath = wh.getDnsPath(new Path(table.getSd().getLocation())); + } + FileSystem fs = wh.getFs(tablePath); + //FileSystem fs = FileSystem.get(getConf()); + RemoteIterator iterator = fs.listFiles(tablePath, true); + while (iterator.hasNext()) { + LocatedFileStatus fileStatus = iterator.next(); + if (!fileStatus.isFile()) { + continue; + } + boolean validFile = + (ORIGINAL_PATTERN.matcher(fileStatus.getPath().getName()).matches() || + ORIGINAL_PATTERN_COPY.matcher(fileStatus.getPath().getName()).matches() + ); + if (!validFile) { + throw new IllegalStateException("Unexpected data file name format. Cannot convert " + + getTableName(table) + " to transactional table. File: " + fileStatus.getPath()); + } + } + } catch (IOException|NoSuchObjectException e) { + String msg = "Unable to list files for " + getTableName(table); + LOG.error(msg, e); + MetaException e1 = new MetaException(msg); + e1.initCause(e); + throw e1; + } + } + private static String getTableName(Table table) { + return table.getDbName() + "." + table.getTableName(); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index cbbb4c47ff..eed6d22fd0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.BucketCodec; @@ -31,7 +32,6 @@ import org.apache.orc.OrcUtils; import org.apache.orc.StripeInformation; import org.apache.orc.TypeDescription; -import org.apache.orc.impl.OrcAcidUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -296,9 +296,11 @@ public void next(OrcStruct next) throws IOException { * of these files as part of a single logical bucket file. * * Also, for unbucketed (non acid) tables, there are no guarantees where data files may be placed. - * For example, CTAS+Tez+Union creates subdirs 1/, 2/, etc for each leg of the Union. Thus the - * data file need not be an immediate child of partition dir. All files for a given writerId are - * treated as one logical unit to assign {@link RecordIdentifier}s to them consistently. + * For example, CTAS+Tez+Union creates subdirs + * {@link AbstractFileMergeOperator#UNION_SUDBIR_PREFIX}_1/, + * {@link AbstractFileMergeOperator#UNION_SUDBIR_PREFIX}_2/, etc for each leg of the Union. Thus + * the data file need not be an immediate child of partition dir. All files for a given writerId + * are treated as one logical unit to assign {@link RecordIdentifier}s to them consistently. * * For Compaction, where each split includes the whole bucket, this means reading over all the * files in order to assign ROW__ID.rowid in one sequence for the entire logical bucket.