diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 06b0209aa0..efb91bc76d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -133,7 +133,7 @@ private void clean(CompactionInfo ci, long minOpenTxnGLB) throws MetaException { return; } Partition p = null; - if (ci.partName != null) { + if (ci.getCis().getPartitionname() != null) { p = resolvePartition(ci); if (p == null) { // The partition was dropped before we got around to cleaning it. @@ -192,11 +192,11 @@ private void clean(CompactionInfo ci, long minOpenTxnGLB) throws MetaException { ValidReaderWriteIdList validWriteIdList = TxnCommonUtils.createValidReaderWriteIdList(rsp.getTblValidWriteIds().get(0)); - if (runJobAsSelf(ci.runAs)) { + if (runJobAsSelf(ci.getCis().getRunas())) { removeFiles(location, validWriteIdList, ci); } else { - LOG.info("Cleaning as user " + ci.runAs + " for " + ci.getFullPartitionName()); - UserGroupInformation ugi = UserGroupInformation.createProxyUser(ci.runAs, + LOG.info("Cleaning as user " + ci.getCis().getRunas() + " for " + ci.getFullPartitionName()); + UserGroupInformation ugi = UserGroupInformation.createProxyUser(ci.getCis().getRunas(), UserGroupInformation.getLoginUser()); ugi.doAs(new PrivilegedExceptionAction() { @Override @@ -220,7 +220,7 @@ public Object run() throws Exception { } } private static String idWatermark(CompactionInfo ci) { - return " id=" + ci.id; + return " id=" + ci.getCis().getId(); } private void removeFiles(String location, ValidWriteIdList writeIdList, CompactionInfo ci) throws IOException, NoSuchObjectException { @@ -256,7 +256,7 @@ private void removeFiles(String location, ValidWriteIdList writeIdList, Compacti } FileSystem fs = filesToDelete.get(0).getFileSystem(conf); - Database db = rs.getDatabase(getDefaultCatalog(conf), ci.dbname); + Database db = rs.getDatabase(getDefaultCatalog(conf), ci.getCis().getDbname()); Boolean isSourceOfRepl = ReplChangeManager.isSourceOfReplication(db); for (Path dead : filesToDelete) { diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index dc05e1990e..c43f5525bb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -156,8 +156,8 @@ private JobConf createBaseJobConf(HiveConf conf, String jobName, Table t, Storag job.setInt(NUM_BUCKETS, sd.getNumBuckets()); job.set(ValidWriteIdList.VALID_WRITEIDS_KEY, writeIds.toString()); overrideMRProps(job, t.getParameters()); // override MR properties from tblproperties if applicable - if (ci.properties != null) { - overrideTblProps(job, t.getParameters(), ci.properties); + if (ci.getCis().getProperties() != null) { + overrideTblProps(job, t.getParameters(), ci.getCis().getProperties()); } String queueName = HiveConf.getVar(job, ConfVars.COMPACTOR_JOB_QUEUE); @@ -269,7 +269,7 @@ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor launchCompactionJob(jobMinorCompact, null, CompactionType.MINOR, null, parsedDeltas.subList(jobSubId * maxDeltastoHandle, (jobSubId + 1) * maxDeltastoHandle), - maxDeltastoHandle, -1, conf, msc, ci.id, jobName); + maxDeltastoHandle, -1, conf, msc, ci.getCis().getId(), jobName); } //now recompute state since we've done minor compactions and have different 'best' set of deltas dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, writeIds); @@ -308,12 +308,12 @@ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor ". Compaction cannot compact above this writeId"; } LOG.error("No delta files or original files found to compact in " + sd.getLocation() + - " for compactionId=" + ci.id + minOpenInfo); + " for compactionId=" + ci.getCis().getId() + minOpenInfo); return; } - launchCompactionJob(job, baseDir, ci.type, dirsToSearch, dir.getCurrentDirectories(), - dir.getCurrentDirectories().size(), dir.getObsolete().size(), conf, msc, ci.id, jobName); + launchCompactionJob(job, baseDir, ci.getCis().getType(), dirsToSearch, dir.getCurrentDirectories(), + dir.getCurrentDirectories().size(), dir.getObsolete().size(), conf, msc, ci.getCis().getId(), jobName); su.gatherStats(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java index f45140d0d7..4ef33d07cc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java @@ -107,7 +107,7 @@ public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { * one partition. */ protected Partition resolvePartition(CompactionInfo ci) throws Exception { - if (ci.partName != null) { + if (ci.getCis().getPartitionname() != null) { List parts; try { parts = getPartitionsByNames(ci); diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index a0df82cb20..9c94d12608 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -115,7 +115,7 @@ public void run() { // Check to see if this is a table level request on a partitioned table. If so, // then it's a dynamic partitioning case and we shouldn't check the table itself. if (t.getPartitionKeys() != null && t.getPartitionKeys().size() > 0 && - ci.partName == null) { + ci.getCis().getPartitionname() == null) { LOG.debug("Skipping entry for " + ci.getFullTableName() + " as it is from dynamic" + " partitioning"); continue; @@ -140,7 +140,7 @@ public void run() { // Figure out who we should run the file operations as Partition p = resolvePartition(ci); - if (p == null && ci.partName != null) { + if (p == null && ci.getCis().getPartitionname() != null) { LOG.info("Can't find partition " + ci.getFullPartitionName() + ", assuming it has been dropped and moving on."); continue; @@ -220,10 +220,10 @@ private boolean lookForCurrentCompactions(ShowCompactResponse compactions, if (compactions.getCompacts() != null) { for (ShowCompactResponseElement e : compactions.getCompacts()) { if ((e.getState().equals(TxnStore.WORKING_RESPONSE) || e.getState().equals(TxnStore.INITIATED_RESPONSE)) && - e.getDbname().equals(ci.dbname) && - e.getTablename().equals(ci.tableName) && - (e.getPartitionname() == null && ci.partName == null || - e.getPartitionname().equals(ci.partName))) { + e.getDbname().equals(ci.getCis().getDbname()) && + e.getTablename().equals(ci.getCis().getTablename()) && + (e.getPartitionname() == null && ci.getCis().getPartitionname() == null || + e.getPartitionname().equals(ci.getCis().getPartitionname()))) { return true; } } @@ -238,7 +238,7 @@ private CompactionType checkForCompaction(final CompactionInfo ci, final String runAs) throws IOException, InterruptedException { // If it's marked as too many aborted, we already know we need to compact - if (ci.tooManyAborts) { + if (ci.getCis().isToomanyaborts()) { LOG.debug("Found too many aborted transactions for " + ci.getFullPartitionName() + ", " + "initiating major compaction"); return CompactionType.MAJOR; @@ -358,13 +358,13 @@ private long sumDirSize(FileSystem fs, Path dir) throws IOException { } private void requestCompaction(CompactionInfo ci, String runAs, CompactionType type) throws MetaException { - CompactionRequest rqst = new CompactionRequest(ci.dbname, ci.tableName, type); - if (ci.partName != null) rqst.setPartitionname(ci.partName); + CompactionRequest rqst = new CompactionRequest(ci.getCis().getDbname(), ci.getCis().getTablename(), type); + if (ci.getCis().getPartitionname() != null) rqst.setPartitionname(ci.getCis().getPartitionname()); rqst.setRunas(runAs); LOG.info("Requesting compaction: " + rqst); CompactionResponse resp = txnHandler.compact(rqst); if(resp.isAccepted()) { - ci.id = resp.getId(); + ci.getCis().setId(resp.getId()); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java index 1ddc54d68a..db5a66766c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java @@ -64,7 +64,7 @@ public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { @Override Table resolveTable(CompactionInfo ci) throws MetaException { try { - return rs.getTable(getDefaultCatalog(conf), ci.dbname, ci.tableName); + return rs.getTable(getDefaultCatalog(conf), ci.getCis().getDbname(), ci.getCis().getTablename()); } catch (MetaException e) { LOG.error("Unable to find table " + ci.getFullTableName() + ", " + e.getMessage()); throw e; @@ -73,8 +73,8 @@ public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { @Override List getPartitionsByNames(CompactionInfo ci) throws MetaException { try { - return rs.getPartitionsByNames(getDefaultCatalog(conf), ci.dbname, ci.tableName, - Collections.singletonList(ci.partName)); + return rs.getPartitionsByNames(getDefaultCatalog(conf), ci.getCis().getDbname(), ci.getCis().getTablename(), + Collections.singletonList(ci.getCis().getTablename())); } catch (MetaException e) { LOG.error("Unable to get partitions by name for CompactionInfo=" + ci); throw e; diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java index 9678786612..d033a6f8e8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java @@ -46,7 +46,7 @@ public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { @Override Table resolveTable(CompactionInfo ci) throws MetaException { try { - return msc.getTable(getDefaultCatalog(conf), ci.dbname, ci.tableName); + return msc.getTable(getDefaultCatalog(conf), ci.getCis().getDbname(), ci.getCis().getTablename()); } catch (TException e) { LOG.error("Unable to find table " + ci.getFullTableName() + ", " + e.getMessage()); throw new MetaException(e.toString()); @@ -55,8 +55,8 @@ public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { @Override List getPartitionsByNames(CompactionInfo ci) throws MetaException { try { - return msc.getPartitionsByNames(getDefaultCatalog(conf), ci.dbname, ci.tableName, - Collections.singletonList(ci.partName)); + return msc.getPartitionsByNames(getDefaultCatalog(conf), ci.getCis().getDbname(), ci.getCis().getTablename(), + Collections.singletonList(ci.getCis().getPartitionname())); } catch (TException e) { LOG.error("Unable to get partitions by name for CompactionInfo=" + ci); throw new MetaException(e.toString()); diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index 42ccfdc2a4..db23f43362 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -133,7 +133,7 @@ public void run() { Partition p = null; try { p = resolvePartition(ci); - if (p == null && ci.partName != null) { + if (p == null && ci.getCis().getPartitionname() != null) { LOG.info("Unable to find partition " + ci.getFullPartitionName() + ", assuming it was dropped and moving on."); msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); @@ -154,8 +154,8 @@ public void run() { continue; } String fullTableName = TxnUtils.getFullTableName(t.getDbName(), t.getTableName()); - if (ci.runAs == null) { - ci.runAs = findUserToRunAs(sd.getLocation(), t); + if (ci.getCis().getRunas() == null) { + ci.getCis().setRunas(findUserToRunAs(sd.getLocation(), t)); } /** * we cannot have Worker use HiveTxnManager (which is on ThreadLocal) since @@ -163,7 +163,7 @@ public void run() { * multiple statements in it (for query based compactor) which is not supported (and since * this case some of the statements are DDL, even in the future will not be allowed in a * multi-stmt txn. {@link Driver#setCompactionWriteIds(ValidWriteIdList, long)} */ - long compactorTxnId = msc.openTxn(ci.runAs, TxnType.COMPACTION); + long compactorTxnId = msc.openTxn(ci.getCis().getRunas(), TxnType.COMPACTION); heartbeater = new CompactionHeartbeater(compactorTxnId, fullTableName, conf); heartbeater.start(); @@ -176,7 +176,7 @@ public void run() { LOG.debug("ValidCompactWriteIdList: " + tblValidWriteIds.writeToString()); conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); - ci.highestWriteId = tblValidWriteIds.getHighWatermark(); + ci.getCis().setHighestWriteId(tblValidWriteIds.getHighWatermark()); //this writes TXN_COMPONENTS to ensure that if compactorTxnId fails, we keep metadata about //it until after any data written by it are physically removed msc.updateCompactorState(CompactionInfo.compactionInfoToStruct(ci), compactorTxnId); @@ -184,14 +184,14 @@ public void run() { jobName.append("-compactor-"); jobName.append(ci.getFullPartitionName()); - LOG.info("Starting " + ci.type.toString() + " compaction for " + ci.getFullPartitionName() + " in " + JavaUtils.txnIdToString(compactorTxnId)); + LOG.info("Starting " + ci.getCis().getType().toString() + " compaction for " + ci.getFullPartitionName() + " in " + JavaUtils.txnIdToString(compactorTxnId)); final StatsUpdater su = StatsUpdater.init(ci, msc.findColumnsWithStats( CompactionInfo.compactionInfoToStruct(ci)), conf, - runJobAsSelf(ci.runAs) ? ci.runAs : t.getOwner()); + runJobAsSelf(ci.getCis().getRunas()) ? ci.getCis().getRunas() : t.getOwner()); final CompactorMR mr = new CompactorMR(); launchedJob = true; try { - if (runJobAsSelf(ci.runAs)) { + if (runJobAsSelf(ci.getCis().getRunas())) { mr.run(conf, jobName.toString(), t, p, sd, tblValidWriteIds, ci, su, msc); } else { UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(), @@ -323,10 +323,10 @@ void gatherStats() { //e.g. analyze table page_view partition(dt='10/15/2014',country=’US’) // compute statistics for columns viewtime StringBuilder sb = new StringBuilder("analyze table ") - .append(StatsUtils.getFullyQualifiedTableName(ci.dbname, ci.tableName)); - if (ci.partName != null) { + .append(StatsUtils.getFullyQualifiedTableName(ci.getCis().getDbname(), ci.getCis().getTablename())); + if (ci.getCis().getPartitionname() != null) { sb.append(" partition("); - Map partitionColumnValues = Warehouse.makeEscSpecFromName(ci.partName); + Map partitionColumnValues = Warehouse.makeEscSpecFromName(ci.getCis().getPartitionname()); for (Map.Entry ent : partitionColumnValues.entrySet()) { sb.append(ent.getKey()).append("='").append(ent.getValue()).append("',"); } @@ -363,7 +363,7 @@ void gatherStats() { } } } catch (Throwable t) { - LOG.error(ci + ": gatherStats(" + ci.dbname + "," + ci.tableName + "," + ci.partName + + LOG.error(ci + ": gatherStats(" + ci.getCis().getPartitionname() + "," + ci.getCis().getPartitionname() + "," + ci.getCis().getPartitionname() + ") failed due to: " + t.getMessage(), t); } } diff --git ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java index b28b57779b..83e2bf5cd6 100644 --- ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java +++ ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java @@ -82,13 +82,13 @@ public void testFindNextToCompact() throws Exception { long now = System.currentTimeMillis(); CompactionInfo ci = txnHandler.findNextToCompact("fred"); assertNotNull(ci); - assertEquals("foo", ci.dbname); - assertEquals("bar", ci.tableName); - assertEquals("ds=today", ci.partName); - assertEquals(CompactionType.MINOR, ci.type); - assertNull(ci.runAs); + assertEquals("foo", ci.getCis().getDbname()); + assertEquals("bar", ci.getCis().getTablename()); + assertEquals("ds=today", ci.getCis().getPartitionname()); + assertEquals(CompactionType.MINOR, ci.getCis().getType()); + assertNull(ci.getCis().getRunas()); assertNull(txnHandler.findNextToCompact("fred")); - ci.runAs = "bob"; + ci.getCis().setRunas("bob"); txnHandler.updateCompactorState(ci, openTxn()); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); @@ -119,20 +119,20 @@ public void testFindNextToCompact2() throws Exception { boolean expectToday = false; CompactionInfo ci = txnHandler.findNextToCompact("fred"); assertNotNull(ci); - assertEquals("foo", ci.dbname); - assertEquals("bar", ci.tableName); - if ("ds=today".equals(ci.partName)) expectToday = false; - else if ("ds=yesterday".equals(ci.partName)) expectToday = true; - else fail("partition name should have been today or yesterday but was " + ci.partName); - assertEquals(CompactionType.MINOR, ci.type); + assertEquals("foo", ci.getCis().getDbname()); + assertEquals("bar", ci.getCis().getTablename()); + if ("ds=today".equals(ci.getCis().getPartitionname())) expectToday = false; + else if ("ds=yesterday".equals(ci.getCis().getPartitionname())) expectToday = true; + else fail("partition name should have been today or yesterday but was " + ci.getCis().getPartitionname()); + assertEquals(CompactionType.MINOR, ci.getCis().getType()); ci = txnHandler.findNextToCompact("fred"); assertNotNull(ci); - assertEquals("foo", ci.dbname); - assertEquals("bar", ci.tableName); - if (expectToday) assertEquals("ds=today", ci.partName); - else assertEquals("ds=yesterday", ci.partName); - assertEquals(CompactionType.MINOR, ci.type); + assertEquals("foo", ci.getCis().getDbname()); + assertEquals("bar", ci.getCis().getTablename()); + if (expectToday) assertEquals("ds=today", ci.getCis().getPartitionname()); + else assertEquals("ds=yesterday", ci.getCis().getPartitionname()); + assertEquals(CompactionType.MINOR, ci.getCis().getType()); assertNull(txnHandler.findNextToCompact("fred")); @@ -372,10 +372,10 @@ public void testFindPotentialCompactions() throws Exception { assertEquals(2, potentials.size()); boolean sawMyTable = false, sawYourTable = false; for (CompactionInfo ci : potentials) { - sawMyTable |= (ci.dbname.equals("mydb") && ci.tableName.equals("mytable") && - ci.partName == null); - sawYourTable |= (ci.dbname.equals("mydb") && ci.tableName.equals("yourtable") && - ci.partName.equals("mypartition")); + sawMyTable |= (ci.getCis().getDbname().equals("mydb") && ci.getCis().getTablename().equals("mytable") && + ci.getCis().getPartitionname() == null); + sawYourTable |= (ci.getCis().getDbname().equals("mydb") && ci.getCis().getTablename().equals("yourtable") && + ci.getCis().getPartitionname().equals("mypartition")); } assertTrue(sawMyTable); assertTrue(sawYourTable); @@ -511,11 +511,11 @@ public void addDynamicPartitions() throws Exception { int i = 0; for (CompactionInfo ci : sorted) { - assertEquals(dbName, ci.dbname); - assertEquals(tableName, ci.tableName); + assertEquals(dbName, ci.getCis().getDbname()); + assertEquals(tableName, ci.getCis().getTablename()); switch (i++) { - case 0: assertEquals("ds=today", ci.partName); break; - case 1: assertEquals("ds=yesterday", ci.partName); break; + case 0: assertEquals("ds=today", ci.getCis().getPartitionname()); break; + case 1: assertEquals("ds=yesterday", ci.getCis().getPartitionname()); break; default: throw new RuntimeException("What?"); } } diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java index 46c79d680e..4f018f8939 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java @@ -59,7 +59,7 @@ public void cleanupAfterMajorTableCompaction() throws Exception { CompactionRequest rqst = new CompactionRequest("default", "camtc", CompactionType.MAJOR); txnHandler.compact(rqst); CompactionInfo ci = txnHandler.findNextToCompact("fred"); - ci.runAs = System.getProperty("user.name"); + ci.getCis().setRunas(System.getProperty("user.name")); txnHandler.updateCompactorState(ci, openTxn()); txnHandler.markCompacted(ci); @@ -92,7 +92,7 @@ public void cleanupAfterMajorPartitionCompaction() throws Exception { rqst.setPartitionname("ds=today"); txnHandler.compact(rqst); CompactionInfo ci = txnHandler.findNextToCompact("fred"); - ci.runAs = System.getProperty("user.name"); + ci.getCis().setRunas(System.getProperty("user.name")); txnHandler.updateCompactorState(ci, openTxn()); txnHandler.markCompacted(ci); @@ -123,7 +123,7 @@ public void cleanupAfterMinorTableCompaction() throws Exception { CompactionRequest rqst = new CompactionRequest("default", "camitc", CompactionType.MINOR); txnHandler.compact(rqst); CompactionInfo ci = txnHandler.findNextToCompact("fred"); - ci.runAs = System.getProperty("user.name"); + ci.getCis().setRunas(System.getProperty("user.name")); txnHandler.updateCompactorState(ci, openTxn()); txnHandler.markCompacted(ci); @@ -163,7 +163,7 @@ public void cleanupAfterMinorPartitionCompaction() throws Exception { rqst.setPartitionname("ds=today"); txnHandler.compact(rqst); CompactionInfo ci = txnHandler.findNextToCompact("fred"); - ci.runAs = System.getProperty("user.name"); + ci.getCis().setRunas(System.getProperty("user.name")); txnHandler.updateCompactorState(ci, openTxn()); txnHandler.markCompacted(ci); @@ -203,7 +203,7 @@ public void cleanupAfterMajorPartitionCompactionNoBase() throws Exception { txnHandler.compact(rqst); CompactionInfo ci = txnHandler.findNextToCompact("fred"); txnHandler.markCompacted(ci); - ci.runAs = System.getProperty("user.name"); + ci.getCis().setRunas(System.getProperty("user.name")); txnHandler.updateCompactorState(ci, openTxn()); startCleaner(); @@ -232,7 +232,7 @@ public void droppedTable() throws Exception { CompactionRequest rqst = new CompactionRequest("default", "dt", CompactionType.MINOR); txnHandler.compact(rqst); CompactionInfo ci = txnHandler.findNextToCompact("fred"); - ci.runAs = System.getProperty("user.name"); + ci.getCis().setRunas(System.getProperty("user.name")); txnHandler.updateCompactorState(ci, openTxn()); txnHandler.markCompacted(ci); @@ -261,7 +261,7 @@ public void droppedPartition() throws Exception { rqst.setPartitionname("ds=today"); txnHandler.compact(rqst); CompactionInfo ci = txnHandler.findNextToCompact("fred"); - ci.runAs = System.getProperty("user.name"); + ci.getCis().setRunas(System.getProperty("user.name")); txnHandler.updateCompactorState(ci, openTxn()); txnHandler.markCompacted(ci); diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java index ea70503988..1588b55a4d 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java @@ -31,32 +31,20 @@ * Information on a possible or running compaction. */ public class CompactionInfo implements Comparable { - - /** - * Modifying this variables or adding new ones should be done in sync - * with the static methods {@code compactionStructToInfo()} and - * {@code compactionInfoToStruct()}. This class is going to be deserialized - * and serialized so missing this may result in the value of the field - * being resetted. This will be fixed at HIVE-21056. - */ - public long id; - public String dbname; - public String tableName; - public String partName; - public char state; - public CompactionType type; - public String workerId; - public long start; - public String runAs; - public String properties; - public boolean tooManyAborts = false; /** - * {@code 0} means it wasn't set (e.g. in case of upgrades, since ResultSet.getLong() will return 0 if field is NULL) + * This encapsulates the part of this class that can be serialized. + * + * For highestWriteId(from CompactionInfoStruct) {@code 0} means it wasn't set (e.g. in case of upgrades, + * since ResultSet.getLong() will return 0 if field is NULL) * See {@link TxnStore#setCompactionHighestWriteId(CompactionInfo, long)} for precise definition. * See also {@link TxnUtils#createValidCompactWriteIdList(TableValidWriteIds)} and * {@link ValidCompactorWriteIdList#highWatermark}. + * + * writeIds(from CompactionInfoStruct) is used for the compactions of type 'p' */ - public long highestWriteId; + + private final CompactionInfoStruct cis; + byte[] metaInfo; String hadoopJobId; @@ -64,26 +52,34 @@ private String fullTableName = null; public CompactionInfo(String dbname, String tableName, String partName, CompactionType type) { - this.dbname = dbname; - this.tableName = tableName; - this.partName = partName; - this.type = type; + cis = new CompactionInfoStruct(); + cis.setDbname(dbname); + cis.setTablename(tableName); + cis.setType(type); + cis.setPartitionname(partName); } CompactionInfo(long id, String dbname, String tableName, String partName, char state) { this(dbname, tableName, partName, null); - this.id = id; - this.state = state; + cis.setId(id); + cis.setState(Character.toString(state)); + } + + CompactionInfo(CompactionInfoStruct cis) { + this.cis = cis; + } + + CompactionInfo() { + cis = new CompactionInfoStruct(); } - CompactionInfo() {} public String getFullPartitionName() { if (fullPartitionName == null) { - StringBuilder buf = new StringBuilder(dbname); + StringBuilder buf = new StringBuilder(cis.getDbname()); buf.append('.'); - buf.append(tableName); - if (partName != null) { + buf.append(cis.getTablename()); + if (cis.getPartitionname() != null) { buf.append('.'); - buf.append(partName); + buf.append(cis.getPartitionname()); } fullPartitionName = buf.toString(); } @@ -92,15 +88,16 @@ public String getFullPartitionName() { public String getFullTableName() { if (fullTableName == null) { - StringBuilder buf = new StringBuilder(dbname); + StringBuilder buf = new StringBuilder(cis.getDbname()); buf.append('.'); - buf.append(tableName); + buf.append(cis.getTablename()); fullTableName = buf.toString(); } return fullTableName; } + public boolean isMajorCompaction() { - return CompactionType.MAJOR == type; + return CompactionType.MAJOR == cis.getType(); } @Override @@ -108,16 +105,16 @@ public int compareTo(CompactionInfo o) { return getFullPartitionName().compareTo(o.getFullPartitionName()); } public String toString() { - return "id:" + id + "," + - "dbname:" + dbname + "," + - "tableName:" + tableName + "," + - "partName:" + partName + "," + - "state:" + state + "," + - "type:" + type + "," + - "properties:" + properties + "," + - "runAs:" + runAs + "," + - "tooManyAborts:" + tooManyAborts + "," + - "highestWriteId:" + highestWriteId; + return "id:" + cis.getId() + "," + + "dbname:" + cis.getDbname() + "," + + "tableName:" + cis.getTablename() + "," + + "partName:" + cis.getPartitionname() + "," + + "state:" + cis.getState() + "," + + "type:" + cis.getType() + "," + + "properties:" + cis.getProperties() + "," + + "runAs:" + cis.getRunas() + "," + + "tooManyAborts:" + cis.isToomanyaborts() + "," + + "highestWriteId:" + cis.getHighestWriteId(); } @Override @@ -146,78 +143,47 @@ public boolean equals(Object obj) { */ static CompactionInfo loadFullFromCompactionQueue(ResultSet rs) throws SQLException { CompactionInfo fullCi = new CompactionInfo(); - fullCi.id = rs.getLong(1); - fullCi.dbname = rs.getString(2); - fullCi.tableName = rs.getString(3); - fullCi.partName = rs.getString(4); - fullCi.state = rs.getString(5).charAt(0);//cq_state - fullCi.type = TxnHandler.dbCompactionType2ThriftType(rs.getString(6).charAt(0)); - fullCi.properties = rs.getString(7); - fullCi.workerId = rs.getString(8); - fullCi.start = rs.getLong(9); - fullCi.runAs = rs.getString(10); - fullCi.highestWriteId = rs.getLong(11); + fullCi.getCis().setId(rs.getLong(1)); + fullCi.getCis().setDbname(rs.getString(2)); + fullCi.getCis().setTablename(rs.getString(3)); + fullCi.getCis().setPartitionname(rs.getString(4)); + fullCi.getCis().setState(rs.getString(5));//cq_state + fullCi.getCis().setType(TxnHandler.dbCompactionType2ThriftType(rs.getString(6).charAt(0))); + fullCi.getCis().setProperties(rs.getString(7)); + fullCi.getCis().setWorkerId(rs.getString(8)); + fullCi.getCis().setStart(rs.getLong(9)); + fullCi.getCis().setRunas(rs.getString(10)); + fullCi.getCis().setHighestWriteId(rs.getLong(11)); fullCi.metaInfo = rs.getBytes(12); fullCi.hadoopJobId = rs.getString(13); return fullCi; } static void insertIntoCompletedCompactions(PreparedStatement pStmt, CompactionInfo ci, long endTime) throws SQLException { - pStmt.setLong(1, ci.id); - pStmt.setString(2, ci.dbname); - pStmt.setString(3, ci.tableName); - pStmt.setString(4, ci.partName); - pStmt.setString(5, Character.toString(ci.state)); - pStmt.setString(6, Character.toString(TxnHandler.thriftCompactionType2DbType(ci.type))); - pStmt.setString(7, ci.properties); - pStmt.setString(8, ci.workerId); - pStmt.setLong(9, ci.start); + pStmt.setLong(1, ci.getCis().getId()); + pStmt.setString(2, ci.getCis().getDbname()); + pStmt.setString(3, ci.getCis().getTablename()); + pStmt.setString(4, ci.getCis().getPartitionname()); + pStmt.setString(5, ci.getCis().getState()); + pStmt.setString(6, Character.toString(TxnHandler.thriftCompactionType2DbType(ci.getCis().getType()))); + pStmt.setString(7, ci.getCis().getProperties()); + pStmt.setString(8, ci.getCis().getWorkerId()); + pStmt.setLong(9, ci.getCis().getStart()); pStmt.setLong(10, endTime); - pStmt.setString(11, ci.runAs); - pStmt.setLong(12, ci.highestWriteId); + pStmt.setString(11, ci.getCis().getRunas()); + pStmt.setLong(12, ci.getCis().getHighestWriteId()); pStmt.setBytes(13, ci.metaInfo); pStmt.setString(14, ci.hadoopJobId); } public static CompactionInfo compactionStructToInfo(CompactionInfoStruct cr) { - if (cr == null) { - return null; - } - CompactionInfo ci = new CompactionInfo(cr.getDbname(), cr.getTablename(), cr.getPartitionname(), cr.getType()); - ci.id = cr.getId(); - ci.runAs = cr.getRunas(); - ci.properties = cr.getProperties(); - if (cr.isSetToomanyaborts()) { - ci.tooManyAborts = cr.isToomanyaborts(); - } - if (cr.isSetState() && cr.getState().length() != 1) { - throw new IllegalStateException("State should only be one character but it was set to " + cr.getState()); - } else if (cr.isSetState()) { - ci.state = cr.getState().charAt(0); - } - ci.workerId = cr.getWorkerId(); - if (cr.isSetStart()) { - ci.start = cr.getStart(); - } - if (cr.isSetHighestWriteId()) { - ci.highestWriteId = cr.getHighestWriteId(); - } - return ci; + return new CompactionInfo(cr); } public static CompactionInfoStruct compactionInfoToStruct(CompactionInfo ci) { if (ci == null) { return null; } - CompactionInfoStruct cr = new CompactionInfoStruct(ci.id, ci.dbname, ci.tableName, ci.type); - cr.setPartitionname(ci.partName); - cr.setRunas(ci.runAs); - cr.setProperties(ci.properties); - cr.setToomanyaborts(ci.tooManyAborts); - cr.setStart(ci.start); - cr.setState(Character.toString(ci.state)); - cr.setWorkerId(ci.workerId); - cr.setHighestWriteId(ci.highestWriteId); - return cr; + return ci.getCis(); } public static OptionalCompactionInfoStruct compactionInfoToOptionalStruct(CompactionInfo ci) { @@ -234,4 +200,8 @@ public static CompactionInfo optionalCompactionInfoStructToInfo(OptionalCompacti } return null; } + + public CompactionInfoStruct getCis() { + return cis; + } } diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 8253ccb9c9..b841a7bfa0 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -76,9 +76,9 @@ public CompactionTxnHandler() { rs = stmt.executeQuery(s); while (rs.next()) { CompactionInfo info = new CompactionInfo(); - info.dbname = rs.getString(1); - info.tableName = rs.getString(2); - info.partName = rs.getString(3); + info.getCis().setDbname(rs.getString(1)); + info.getCis().setTablename(rs.getString(2)); + info.getCis().setPartitionname(rs.getString(3)); response.add(info); } rs.close(); @@ -94,10 +94,10 @@ public CompactionTxnHandler() { rs = stmt.executeQuery(s); while (rs.next()) { CompactionInfo info = new CompactionInfo(); - info.dbname = rs.getString(1); - info.tableName = rs.getString(2); - info.partName = rs.getString(3); - info.tooManyAborts = true; + info.getCis().setDbname(rs.getString(1)); + info.getCis().setTablename(rs.getString(2)); + info.getCis().setPartitionname(rs.getString(3)); + info.getCis().setToomanyaborts(true); response.add(info); } @@ -145,16 +145,16 @@ public CompactionInfo findNextToCompact(String workerId) throws MetaException { updStmt = dbConn.createStatement(); do { CompactionInfo info = new CompactionInfo(); - info.id = rs.getLong(1); - info.dbname = rs.getString(2); - info.tableName = rs.getString(3); - info.partName = rs.getString(4); - info.type = dbCompactionType2ThriftType(rs.getString(5).charAt(0)); - info.properties = rs.getString(6); + info.getCis().setId(rs.getLong(1)); + info.getCis().setDbname(rs.getString(2)); + info.getCis().setTablename(rs.getString(3)); + info.getCis().setPartitionname(rs.getString(4)); + info.getCis().setType(dbCompactionType2ThriftType(rs.getString(5).charAt(0))); + info.getCis().setProperties(rs.getString(6)); // Now, update this record as being worked on by this worker. long now = getDbTime(dbConn); s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " + - "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id + + "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.getCis().getId() + " AND cq_state='" + INITIATED_STATE + "'"; LOG.debug("Going to execute update <" + s + ">"); int updCount = updStmt.executeUpdate(s); @@ -204,7 +204,7 @@ public void markCompacted(CompactionInfo info) throws MetaException { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); String s = "update COMPACTION_QUEUE set cq_state = '" + READY_FOR_CLEANING + "', " + - "cq_worker_id = null where cq_id = " + info.id; + "cq_worker_id = null where cq_id = " + info.getCis().getId(); LOG.debug("Going to execute update <" + s + ">"); int updCnt = stmt.executeUpdate(s); if (updCnt != 1) { @@ -254,17 +254,17 @@ public void markCompacted(CompactionInfo info) throws MetaException { rs = stmt.executeQuery(s); while (rs.next()) { CompactionInfo info = new CompactionInfo(); - info.id = rs.getLong(1); - info.dbname = rs.getString(2); - info.tableName = rs.getString(3); - info.partName = rs.getString(4); + info.getCis().setId(rs.getLong(1)); + info.getCis().setDbname(rs.getString(2)); + info.getCis().setTablename(rs.getString(3)); + info.getCis().setPartitionname(rs.getString(4)); switch (rs.getString(5).charAt(0)) { - case MAJOR_TYPE: info.type = CompactionType.MAJOR; break; - case MINOR_TYPE: info.type = CompactionType.MINOR; break; + case MAJOR_TYPE: info.getCis().setType(CompactionType.MAJOR); break; + case MINOR_TYPE: info.getCis().setType(CompactionType.MINOR); break; default: throw new MetaException("Unexpected compaction type " + rs.getString(5)); } - info.runAs = rs.getString(6); - info.highestWriteId = rs.getLong(7); + info.getCis().setRunas(rs.getString(6)); + info.getCis().setHighestWriteId(rs.getLong(7)); rc.add(info); } LOG.debug("Going to rollback"); @@ -352,18 +352,18 @@ public void markCleaned(CompactionInfo info) throws MetaException { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_WRITE_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = ?"); - pStmt.setLong(1, info.id); + pStmt.setLong(1, info.getCis().getId()); rs = pStmt.executeQuery(); if(rs.next()) { info = CompactionInfo.loadFullFromCompactionQueue(rs); } else { - throw new IllegalStateException("No record with CQ_ID=" + info.id + " found in COMPACTION_QUEUE"); + throw new IllegalStateException("No record with CQ_ID=" + info.getCis().getId() + " found in COMPACTION_QUEUE"); } close(rs); String s = "delete from COMPACTION_QUEUE where cq_id = ?"; pStmt = dbConn.prepareStatement(s); - pStmt.setLong(1, info.id); + pStmt.setLong(1, info.getCis().getId()); LOG.debug("Going to execute update <" + s + ">"); int updCount = pStmt.executeUpdate(); if (updCount != 1) { @@ -372,7 +372,7 @@ public void markCleaned(CompactionInfo info) throws MetaException { dbConn.rollback(); } pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_WRITE_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)"); - info.state = SUCCEEDED_STATE; + info.getCis().setState(Character.toString(SUCCEEDED_STATE)); CompactionInfo.insertIntoCompletedCompactions(pStmt, info, getDbTime(dbConn)); updCount = pStmt.executeUpdate(); @@ -381,21 +381,21 @@ public void markCleaned(CompactionInfo info) throws MetaException { //highestWriteId will be NULL in upgrade scenarios s = "delete from COMPLETED_TXN_COMPONENTS where ctc_database = ? and " + "ctc_table = ?"; - if (info.partName != null) { + if (info.getCis().getPartitionname() != null) { s += " and ctc_partition = ?"; } - if(info.highestWriteId != 0) { + if(info.getCis().getHighestWriteId() != 0) { s += " and ctc_writeid <= ?"; } pStmt = dbConn.prepareStatement(s); int paramCount = 1; - pStmt.setString(paramCount++, info.dbname); - pStmt.setString(paramCount++, info.tableName); - if (info.partName != null) { - pStmt.setString(paramCount++, info.partName); + pStmt.setString(paramCount++, info.getCis().getDbname()); + pStmt.setString(paramCount++, info.getCis().getTablename()); + if (info.getCis().getTablename() != null) { + pStmt.setString(paramCount++, info.getCis().getTablename()); } - if(info.highestWriteId != 0) { - pStmt.setLong(paramCount++, info.highestWriteId); + if(info.getCis().getHighestWriteId() != 0) { + pStmt.setLong(paramCount++, info.getCis().getHighestWriteId()); } LOG.debug("Going to execute update <" + s + ">"); if ((updCount = pStmt.executeUpdate()) < 1) { @@ -410,18 +410,18 @@ public void markCleaned(CompactionInfo info) throws MetaException { */ s = "select distinct txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" + TXN_ABORTED + "' and tc_database = ? and tc_table = ?"; - if (info.highestWriteId != 0) s += " and tc_writeid <= ?"; - if (info.partName != null) s += " and tc_partition = ?"; + if (info.getCis().getHighestWriteId() != 0) s += " and tc_writeid <= ?"; + if (info.getCis().getPartitionname() != null) s += " and tc_partition = ?"; pStmt = dbConn.prepareStatement(s); paramCount = 1; - pStmt.setString(paramCount++, info.dbname); - pStmt.setString(paramCount++, info.tableName); - if(info.highestWriteId != 0) { - pStmt.setLong(paramCount++, info.highestWriteId); + pStmt.setString(paramCount++, info.getCis().getDbname()); + pStmt.setString(paramCount++, info.getCis().getTablename()); + if(info.getCis().getHighestWriteId() != 0) { + pStmt.setLong(paramCount++, info.getCis().getHighestWriteId()); } - if (info.partName != null) { - pStmt.setString(paramCount++, info.partName); + if (info.getCis().getPartitionname() != null) { + pStmt.setString(paramCount++, info.getCis().getPartitionname()); } LOG.debug("Going to execute update <" + s + ">"); @@ -446,7 +446,7 @@ public void markCleaned(CompactionInfo info) throws MetaException { //because 1 txn may include different partitions/tables even in auto commit mode suffix.append(" and tc_database = ?"); suffix.append(" and tc_table = ?"); - if (info.partName != null) { + if (info.getCis().getPartitionname() != null) { suffix.append(" and tc_partition = ?"); } @@ -466,10 +466,10 @@ public void markCleaned(CompactionInfo info) throws MetaException { } totalCount += insertCount; paramCount = insertCount + 1; - pStmt.setString(paramCount++, info.dbname); - pStmt.setString(paramCount++, info.tableName); - if (info.partName != null) { - pStmt.setString(paramCount++, info.partName); + pStmt.setString(paramCount++, info.getCis().getDbname()); + pStmt.setString(paramCount++, info.getCis().getTablename()); + if (info.getCis().getPartitionname() != null) { + pStmt.setString(paramCount++, info.getCis().getPartitionname()); } int rc = pStmt.executeUpdate(); LOG.debug("Removed " + rc + " records from txn_components"); @@ -725,21 +725,21 @@ public void revokeTimedoutWorkers(long timeout) throws MetaException { StringBuilder bldr = new StringBuilder(); bldr.append("SELECT ").append(quote).append("COLUMN_NAME").append(quote) .append(" FROM ") - .append(quote).append((ci.partName == null ? "TAB_COL_STATS" : "PART_COL_STATS")) + .append(quote).append((ci.getCis().getPartitionname() == null ? "TAB_COL_STATS" : "PART_COL_STATS")) .append(quote) .append(" WHERE ") .append(quote).append("DB_NAME").append(quote).append(" = ?") .append(" AND ").append(quote).append("TABLE_NAME").append(quote) .append(" = ?"); - if (ci.partName != null) { + if (ci.getCis().getPartitionname() != null) { bldr.append(" AND ").append(quote).append("PARTITION_NAME").append(quote).append(" = ?"); } String s = bldr.toString(); pStmt = dbConn.prepareStatement(s); - pStmt.setString(1, ci.dbname); - pStmt.setString(2, ci.tableName); - if (ci.partName != null) { - pStmt.setString(3, ci.partName); + pStmt.setString(1, ci.getCis().getDbname()); + pStmt.setString(2, ci.getCis().getTablename()); + if (ci.getCis().getPartitionname() != null) { + pStmt.setString(3, ci.getCis().getPartitionname()); } /*String s = "SELECT COLUMN_NAME FROM " + (ci.partName == null ? "TAB_COL_STATS" : @@ -752,14 +752,14 @@ public void revokeTimedoutWorkers(long timeout) throws MetaException { while (rs.next()) { columns.add(rs.getString(1)); } - LOG.debug("Found columns to update stats: " + columns + " on " + ci.tableName + - (ci.partName == null ? "" : "/" + ci.partName)); + LOG.debug("Found columns to update stats: " + columns + " on " + ci.getCis().getTablename() + + (ci.getCis().getPartitionname() == null ? "" : "/" + ci.getCis().getPartitionname())); dbConn.commit(); return columns; } catch (SQLException e) { rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "findColumnsWithStats(" + ci.tableName + - (ci.partName == null ? "" : "/" + ci.partName) + ")"); + checkRetryable(dbConn, e, "findColumnsWithStats(" + ci.getCis().getTablename() + + (ci.getCis().getPartitionname() == null ? "" : "/" + ci.getCis().getPartitionname()) + ")"); throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { @@ -778,8 +778,8 @@ public void updateCompactorState(CompactionInfo ci, long compactionTxnId) throws dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); String sqlText = "UPDATE COMPACTION_QUEUE SET CQ_HIGHEST_WRITE_ID = " + - ci.highestWriteId + ", cq_run_as = " + quoteString(ci.runAs) + - " WHERE CQ_ID = " + ci.id; + ci.getCis().getHighestWriteId() + ", cq_run_as = " + quoteString(ci.getCis().getRunas()) + + " WHERE CQ_ID = " + ci.getCis().getId(); if(LOG.isDebugEnabled()) { LOG.debug("About to execute: " + sqlText); } @@ -799,15 +799,15 @@ public void updateCompactorState(CompactionInfo ci, long compactionTxnId) throws "TC_TXNID, " + "TC_DATABASE, " + "TC_TABLE, " + - (ci.partName == null ? "" : "TC_PARTITION, ") + + (ci.getCis().getPartitionname() == null ? "" : "TC_PARTITION, ") + "TC_WRITEID, " + "TC_OPERATION_TYPE)" + " VALUES(" + compactionTxnId + "," + - quoteString(ci.dbname) + "," + - quoteString(ci.tableName) + "," + - (ci.partName == null ? "" : quoteString(ci.partName) + ",") + - ci.highestWriteId + ", " + + quoteString(ci.getCis().getDbname()) + "," + + quoteString(ci.getCis().getTablename()) + "," + + (ci.getCis().getTablename() == null ? "" : quoteString(ci.getCis().getPartitionname()) + ",") + + ci.getCis().getHighestWriteId() + ", " + quoteChar(OperationType.COMPACT.getSqlConst()) + ")"; if(LOG.isDebugEnabled()) { LOG.debug("About to execute: " + sqlText); @@ -840,20 +840,20 @@ public void updateCompactorState(CompactionInfo ci, long compactionTxnId) throws } } private void checkForDeletion(List deleteSet, CompactionInfo ci, RetentionCounters rc) { - switch (ci.state) { + switch (ci.getCis().getState().charAt(0)) { case ATTEMPTED_STATE: if(--rc.attemptedRetention < 0) { - deleteSet.add(ci.id); + deleteSet.add(ci.getCis().getId()); } break; case FAILED_STATE: if(--rc.failedRetention < 0) { - deleteSet.add(ci.id); + deleteSet.add(ci.getCis().getId()); } break; case SUCCEEDED_STATE: if(--rc.succeededRetention < 0) { - deleteSet.add(ci.id); + deleteSet.add(ci.getCis().getId()); } break; default: @@ -983,12 +983,12 @@ public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException { pStmt = dbConn.prepareStatement("select CC_STATE from COMPLETED_COMPACTIONS where " + "CC_DATABASE = ? and " + "CC_TABLE = ? " + - (ci.partName != null ? "and CC_PARTITION = ?" : "") + + (ci.getCis().getPartitionname() != null ? "and CC_PARTITION = ?" : "") + " and CC_STATE != " + quoteChar(ATTEMPTED_STATE) + " order by CC_ID desc"); - pStmt.setString(1, ci.dbname); - pStmt.setString(2, ci.tableName); - if (ci.partName != null) { - pStmt.setString(3, ci.partName); + pStmt.setString(1, ci.getCis().getDbname()); + pStmt.setString(2, ci.getCis().getTablename()); + if (ci.getCis().getPartitionname() != null) { + pStmt.setString(3, ci.getCis().getPartitionname()); } rs = pStmt.executeQuery(); int numFailed = 0; @@ -1037,35 +1037,35 @@ public void markFailed(CompactionInfo ci) throws MetaException {//todo: this sho dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_WRITE_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = ?"); - pStmt.setLong(1, ci.id); + pStmt.setLong(1, ci.getCis().getId()); rs = pStmt.executeQuery(); if(rs.next()) { ci = CompactionInfo.loadFullFromCompactionQueue(rs); String s = "delete from COMPACTION_QUEUE where cq_id = ?"; pStmt = dbConn.prepareStatement(s); - pStmt.setLong(1, ci.id); + pStmt.setLong(1, ci.getCis().getId()); LOG.debug("Going to execute update <" + s + ">"); int updCnt = pStmt.executeUpdate(); } else { - if(ci.id > 0) { + if(ci.getCis().getId() > 0) { //the record with valid CQ_ID has disappeared - this is a sign of something wrong - throw new IllegalStateException("No record with CQ_ID=" + ci.id + " found in COMPACTION_QUEUE"); + throw new IllegalStateException("No record with CQ_ID=" + ci.getCis().getId() + " found in COMPACTION_QUEUE"); } } - if(ci.id == 0) { + if(ci.getCis().getId() == 0) { //The failure occurred before we even made an entry in COMPACTION_QUEUE //generate ID so that we can make an entry in COMPLETED_COMPACTIONS - ci.id = generateCompactionQueueId(stmt); + ci.getCis().setId(generateCompactionQueueId(stmt)); //mostly this indicates that the Initiator is paying attention to some table even though //compactions are not happening. - ci.state = ATTEMPTED_STATE; + ci.getCis().setState(Character.toString(ATTEMPTED_STATE)); //this is not strictly accurate, but 'type' cannot be null. - if(ci.type == null) { ci.type = CompactionType.MINOR; } - ci.start = getDbTime(dbConn); + if(!ci.getCis().isSetId()) { ci.getCis().setType(CompactionType.MINOR); } + ci.getCis().setStart(getDbTime(dbConn)); } else { - ci.state = FAILED_STATE; + ci.getCis().setState(Character.toString(FAILED_STATE)); } close(rs, stmt, null); closeStmt(pStmt); @@ -1077,7 +1077,7 @@ public void markFailed(CompactionInfo ci) throws MetaException {//todo: this sho closeStmt(pStmt); dbConn.commit(); } catch (SQLException e) { - LOG.warn("markFailed(" + ci.id + "):" + e.getMessage()); + LOG.warn("markFailed(" + ci.getCis().getId() + "):" + e.getMessage()); LOG.debug("Going to rollback"); rollbackDBConn(dbConn); try {