diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 7818efbbf5..066be95b6f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -1833,7 +1833,17 @@ private static Path chooseFile(Path baseOrDeltaDir, FileSystem fs) throws IOExce throw new IllegalArgumentException(baseOrDeltaDir + " is not a base/delta"); } FileStatus[] dataFiles = fs.listStatus(new Path[] {baseOrDeltaDir}, originalBucketFilter); - return dataFiles != null && dataFiles.length > 0 ? dataFiles[0].getPath() : null; + if(dataFiles == null || dataFiles.length <= 0) { + return null; + } + for(FileStatus f : dataFiles) { + if(f.getLen() > 3) { + //in case we have empty file or a file with just "ORC" in it from which you cannot + //create a Reader + return f.getPath(); + } + } + return null; } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 929ea9b1ed..a0370d9b54 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -1133,7 +1133,12 @@ public Options clone() { /* side files are only created by streaming ingest. If this is a compaction, we may * have an insert delta/ here with side files there because the original writer died.*/ long length = AcidUtils.getLogicalLength(fs, fs.getFileStatus(deltaFile)); - assert length >= 0; + if(length <= 3) { + //older streaming clients do not properly handle interrupt and may leave empty files + //behind (or files with nothing but "ORC" in them). It's not possible to create Reader + //for them + continue; + } Reader deltaReader = OrcFile.createReader(deltaFile, OrcFile.readerOptions(conf).maxLength(length)); //must get statementId from file name since Acid 1.0 doesn't write it into bucketProperty ReaderPairAcid deltaPair = new ReaderPairAcid(key, deltaReader, minKey, maxKey, deltaEventOptions, conf); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 98f5df149a..7d0dbf6271 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -538,6 +538,7 @@ public void flush() throws IOException { public void close(boolean abort) throws IOException { if (abort) { if (flushLengths == null) { + //should be a delta with a single txn in it. if (LOG.isDebugEnabled()) { LOG.debug("Close on abort for path: {}.. Deleting..", path); } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 611f85a8ce..5e081992ea 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -598,7 +598,6 @@ private void launchCompactionJob(JobConf job, Path baseDir, CompactionType compa * to use. * @param job the job to update * @param cols the columns of the table - * @param map */ private void setColumnTypes(JobConf job, List cols) { StringBuilder colNames = new StringBuilder(); @@ -823,7 +822,7 @@ public String toString() { dir.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) { boolean sawBase = dir.getName().startsWith(AcidUtils.BASE_PREFIX); boolean isRawFormat = !dir.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX) - && AcidUtils.MetaDataFile.isRawFormat(dir, fs);//deltes can't be raw format + && AcidUtils.MetaDataFile.isRawFormat(dir, fs);//deletes can't be raw format FileStatus[] files = fs.listStatus(dir, isRawFormat ? AcidUtils.originalBucketFilter : AcidUtils.bucketFileFilter); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java new file mode 100644 index 0000000000..7b5c324bae --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java @@ -0,0 +1,147 @@ +package org.apache.hadoop.hive.ql; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.orc.impl.OrcAcidUtils; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.List; + +public class TestTxnCommands3 extends TxnCommandsBaseForTests { + static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommands3.class); + private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + + File.separator + TestTxnCommands3.class.getCanonicalName() + + "-" + System.currentTimeMillis() + ).getPath().replaceAll("\\\\", "/"); + @Override + String getTestDataDir() { + return TEST_DATA_DIR; + } + @Test + public void testRenameTable() throws Exception { + MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID, true); + runStatementOnDriver("drop database if exists mydb1 cascade"); + runStatementOnDriver("drop database if exists mydb2 cascade"); + runStatementOnDriver("create database mydb1"); + runStatementOnDriver("create database mydb2"); + runStatementOnDriver("create table mydb1.T(a int, b int) stored as orc"); + runStatementOnDriver("insert into mydb1.T values(1,2),(4,5)"); + //put something in WRITE_SET + runStatementOnDriver("update mydb1.T set b = 6 where b = 5"); + runStatementOnDriver("alter table mydb1.T compact 'minor'"); + + runStatementOnDriver("alter table mydb1.T RENAME TO mydb1.S"); + + String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from mydb1.S"; + String[][] expected = new String[][] { + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", + "s/delta_0000001_0000001_0000/bucket_00000"}, + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t4\t6", + "s/delta_0000002_0000002_0000/bucket_00000"}}; + checkResult(expected, testQuery, false, "check data", LOG); + + + Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='t'")); + Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from COMPACTION_QUEUE where CQ_TABLE='t'")); + Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from WRITE_SET where WS_TABLE='t'")); + Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='t'")); + Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from NEXT_WRITE_ID where NWI_TABLE='t'")); + + Assert.assertEquals( + TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"), 2, + TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='s'")); + Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from COMPACTION_QUEUE where CQ_TABLE='s'")); + Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from WRITE_SET where WS_TABLE='s'")); + Assert.assertEquals(3, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='s'")); + Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from NEXT_WRITE_ID where NWI_TABLE='s'")); + + runStatementOnDriver("alter table mydb1.S RENAME TO mydb2.bar"); + + Assert.assertEquals( + TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"), 2, + TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='bar'")); + Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from COMPACTION_QUEUE where CQ_TABLE='bar'")); + Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from WRITE_SET where WS_TABLE='bar'")); + Assert.assertEquals(4, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='bar'")); + Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from NEXT_WRITE_ID where NWI_TABLE='bar'")); + } + //todo: re-compile w/o the fix and make sure this fails + @Test + public void testCompactionEmptyFile() throws Exception { + MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID, true); + runStatementOnDriver("create table T(a int, b int) stored as orc"); + runStatementOnDriver("insert into T values(1,2),(4,5)"); + //create delta_2 (so that writeId=2 is allocated) + runStatementOnDriver("insert into T values(1,2),(4,5)"); + FileSystem fs = FileSystem.get(hiveConf); + Path emptyFile = new Path(getWarehouseDir() + "/t/delta_0000002_0000002_0000/bucket_00000"); + fs.delete(emptyFile); + //now re-create it as empty + FSDataOutputStream os = fs.create(emptyFile); + os.close(); + os = fs.create(OrcAcidUtils.getSideFile(emptyFile)); + os.writeLong(0); + os.close(); + + Assert.assertEquals(0, fs.getFileStatus(emptyFile).getLen()); + String[][] expected = new String[][]{ + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", + "t/delta_0000001_0000001_0000/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t4\t5", + "t/delta_0000001_0000001_0000/bucket_00000"} + }; + checkResult(expected, "select ROW__ID, a, b, INPUT__FILE__NAME from T order by a", false, + "before compaciton", LOG); + + runStatementOnDriver("alter table T compact 'MINOR'"); + runWorker(hiveConf); + + String[][] expected2 = new String[][]{ + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", + "t/delta_0000001_0000002/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t4\t5", + "t/delta_0000001_0000002/bucket_00000"} + }; + checkResult(expected2, "select ROW__ID, a, b, INPUT__FILE__NAME from T order by a", false, + "after compaction", LOG); + + Path emptyBaseFile = new Path(getWarehouseDir() + "/t/base_0000002/bucket_00000"); + os = fs.create(emptyBaseFile); + os.close(); + + runStatementOnDriver("insert into T values(1,2),(4,5)");//create committed writeId=3 + Path p = new Path(getWarehouseDir() + "/t/delta_0000003_0000003_0000/bucket_00000"); + fs.delete(p); + os = fs.create(p);//make the file empty + os.close(); + + runStatementOnDriver("alter table T compact 'MAJOR'"); + runWorker(hiveConf); + //none of this fails. OrcRawRecordMerger already has len > 0 checks for base files and + // + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java index 5b8ff153ae..fe2f2d9f0d 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java @@ -176,67 +176,4 @@ public void testConcatenateMM() throws Exception { "t/base_0000002/000000_0"}}; checkResult(expected2, testQuery, false, "check data after concatenate", LOG); } - @Test - public void testRenameTable() throws Exception { - MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID, true); - runStatementOnDriver("drop database if exists mydb1 cascade"); - runStatementOnDriver("drop database if exists mydb2 cascade"); - runStatementOnDriver("create database mydb1"); - runStatementOnDriver("create database mydb2"); - runStatementOnDriver("create table mydb1.T(a int, b int) stored as orc"); - runStatementOnDriver("insert into mydb1.T values(1,2),(4,5)"); - //put something in WRITE_SET - runStatementOnDriver("update mydb1.T set b = 6 where b = 5"); - runStatementOnDriver("alter table mydb1.T compact 'minor'"); - - runStatementOnDriver("alter table mydb1.T RENAME TO mydb1.S"); - - String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from mydb1.S"; - String[][] expected = new String[][] { - {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", - "s/delta_0000001_0000001_0000/bucket_00000"}, - {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t4\t6", - "s/delta_0000002_0000002_0000/bucket_00000"}}; - checkResult(expected, testQuery, false, "check data", LOG); - - - Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='t'")); - Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from COMPACTION_QUEUE where CQ_TABLE='t'")); - Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from WRITE_SET where WS_TABLE='t'")); - Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='t'")); - Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from NEXT_WRITE_ID where NWI_TABLE='t'")); - - Assert.assertEquals( - TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"), 2, - TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='s'")); - Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from COMPACTION_QUEUE where CQ_TABLE='s'")); - Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from WRITE_SET where WS_TABLE='s'")); - Assert.assertEquals(3, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='s'")); - Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from NEXT_WRITE_ID where NWI_TABLE='s'")); - - runStatementOnDriver("alter table mydb1.S RENAME TO mydb2.bar"); - - Assert.assertEquals( - TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"), 2, - TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='bar'")); - Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from COMPACTION_QUEUE where CQ_TABLE='bar'")); - Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from WRITE_SET where WS_TABLE='bar'")); - Assert.assertEquals(4, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='bar'")); - Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from NEXT_WRITE_ID where NWI_TABLE='bar'")); - } }