diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index cd47a63e14..260a2f2b49 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -1066,7 +1066,7 @@ public static Directory getAcidState(Path directory, for (HdfsFileStatusWithId fswid : original) { obsolete.add(fswid.getFileStatus()); } - // Add original direcotries to obsolete list if any + // Add original directories to obsolete list if any obsolete.addAll(originalDirectories); // remove the entries so we don't get confused later and think we should // use them. @@ -1223,7 +1223,7 @@ private static void getChildState(FileStatus child, HdfsFileStatusWithId childWi String deltaPrefix = fn.startsWith(DELTA_PREFIX) ? DELTA_PREFIX : DELETE_DELTA_PREFIX; ParsedDelta delta = parseDelta(child, deltaPrefix, fs); // Handle aborted deltas. Currently this can only happen for MM tables. - if (tblproperties != null && isTransactionalTable(tblproperties) && + if (tblproperties != null && isTransactionalTable(tblproperties) &&//todo: comment don't match - all txn tables are isTransactionalTable() ValidWriteIdList.RangeResponse.ALL == writeIdList.isWriteIdRangeAborted( delta.minWriteId, delta.maxWriteId)) { aborted.add(child); @@ -1814,7 +1814,7 @@ public static String getFullTableName(String dbName, String tableName) { } /** - * @param baseOrDeltaDir detla or base dir, must exist + * @param baseOrDeltaDir delta or base dir, must exist */ public static void createCompactorMarker(Path baseOrDeltaDir, FileSystem fs) throws IOException { /** diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 611f85a8ce..c48d25613f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -1143,7 +1143,42 @@ public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException { } + private void startWrite() { + /** + * Generally in a FS you have to create a dir, then write something to it so if we want to + * use a _write_in_progress file, we create a deleta dir (and it's immediately visible), then + * we create _write_in_progress file, i.e. there is a race condition. + * + * can we use FileSystem.createNewFile(Path) - it creates 0-length file. (not atomic). + * will this create parent folders if they don't exist? Need to test. ONly if yes, can we + * use _write_in_progress file. If not, we have to use isDone. + * + * But if using isDone, it has to be add to all deltas, including MM tables. + * Do it from MoveTask? before Acid commit? + * + * Right now we use _metadata_acid to indicate it's from a compactor. So we can't use it to + * indicate is done. We could put state in this file to include version, isCompacted. + * Longer term it can become the manifest. + * + * Or we can modify the _orc_acid_version to have more in it since we write it to each dir. + * (it has weird name for a more general file) + * + * So if we have this scheme and we read a version/manifest file that doesn't have 'new' structure, + * we assume it's written by previous version of software and thus before upgrade and so we assume + * that dir is already consistent? + * + * What about Load Data, add partition etc? These are all guarded by commit which (with S3 guard) is enough. + * + * Note that updates of a file are not consistent in S3 (w/o guard) + * So compactor creates meta file during move. + * Regular insert, during Hive.loadTable/Partition we create the file + * We have to bump the version of the file - since we'll have dirs created w/o this file. + * + */ + } + private void endWrite() { + } @Override public void commitJob(JobContext context) throws IOException { JobConf conf = ShimLoader.getHadoopShims().getJobConf(context); @@ -1174,7 +1209,7 @@ public void commitJob(JobContext context) throws IOException { AcidUtils.OrcAcidVersion.writeVersionFile(newDeltaDir, fs); return; } - FileStatus[] contents = fs.listStatus(tmpLocation);//expect 1 base or delta dir in this list + FileStatus[] contents = fs.listStatus(tmpLocation);//expect 1 base or delta dir in this list - I bet this comment is wrong. we can have delta and delete delta and thus we do 2 renames here.... //we have MIN_TXN, MAX_TXN and IS_MAJOR in JobConf so we could figure out exactly what the dir //name is that we want to rename; leave it for another day // TODO: if we expect one dir why don't we enforce it?