diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index c47e78e09f..fe0aaa4ff5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -233,9 +233,10 @@ public JobConf getMrJob() { static final private Logger LOG = LoggerFactory.getLogger(StatsUpdater.class); public static StatsUpdater init(CompactionInfo ci, List columnListForStats, - HiveConf conf, String userName) { + HiveConf conf, String userName) { return new StatsUpdater(ci, columnListForStats, conf, userName); } + /** * list columns for which to compute stats. This maybe empty which means no stats gathering * is needed. @@ -246,11 +247,11 @@ public static StatsUpdater init(CompactionInfo ci, List columnListForSta private final CompactionInfo ci; private StatsUpdater(CompactionInfo ci, List columnListForStats, - HiveConf conf, String userName) { + HiveConf conf, String userName) { this.conf = conf; this.userName = userName; this.ci = ci; - if(!ci.isMajorCompaction() || columnListForStats == null || columnListForStats.isEmpty()) { + if (!ci.isMajorCompaction() || columnListForStats == null || columnListForStats.isEmpty()) { columnList = Collections.emptyList(); return; } @@ -258,58 +259,65 @@ private StatsUpdater(CompactionInfo ci, List columnListForStats, } /** - * todo: what should this do on failure? Should it rethrow? Invalidate stats? + * This doesn't throw any exceptions because we don't want the Compaction to appear as failed + * if stats gathering fails since this prevents Cleaner from doing it's job and if there are + * multiple failures, auto initiated compactions will stop which leads to problems that are + * much worse than stale stats. + * + * todo: longer term we should write something COMPACTION_QUEUE.CQ_META_INFO. This is a binary + * field so need to figure out the msg format and how to surface it in SHOW COMPACTIONS, etc */ - void gatherStats() throws IOException { - if(!ci.isMajorCompaction()) { - return; - } - if(columnList.isEmpty()) { - LOG.debug("No existing stats for " - + StatsUtils.getFullyQualifiedTableName(ci.dbname, ci.tableName) - + " found. Will not run analyze."); - return;//nothing to do - } - //e.g. analyze table page_view partition(dt='10/15/2014',country=’US’) - // compute statistics for columns viewtime - StringBuilder sb = new StringBuilder("analyze table ") - .append(StatsUtils.getFullyQualifiedTableName(ci.dbname, ci.tableName)); - if(ci.partName != null) { - try { + void gatherStats() { + try { + if (!ci.isMajorCompaction()) { + return; + } + if (columnList.isEmpty()) { + LOG.debug(ci + ": No existing stats found. Will not run analyze."); + return;//nothing to do + } + //e.g. analyze table page_view partition(dt='10/15/2014',country=’US’) + // compute statistics for columns viewtime + StringBuilder sb = new StringBuilder("analyze table ") + .append(StatsUtils.getFullyQualifiedTableName(ci.dbname, ci.tableName)); + if (ci.partName != null) { sb.append(" partition("); Map partitionColumnValues = Warehouse.makeEscSpecFromName(ci.partName); - for(Map.Entry ent : partitionColumnValues.entrySet()) { + for (Map.Entry ent : partitionColumnValues.entrySet()) { sb.append(ent.getKey()).append("='").append(ent.getValue()).append("',"); } sb.setLength(sb.length() - 1); //remove trailing , sb.append(")"); } - catch(MetaException ex) { - throw new IOException(ex); + sb.append(" compute statistics for columns "); + for (String colName : columnList) { + sb.append(colName).append(","); } - } - sb.append(" compute statistics for columns "); - for(String colName : columnList) { - sb.append(colName).append(","); - } - sb.setLength(sb.length() - 1); //remove trailing , - LOG.info("running '" + sb.toString() + "'"); - Driver d = new Driver(new QueryState.Builder().withGenerateNewQueryId(true).withHiveConf(conf).build(), userName); - SessionState localSession = null; - if(SessionState.get() == null) { - localSession = SessionState.start(new SessionState(conf)); - } - try { - CommandProcessorResponse cpr = d.run(sb.toString()); - if (cpr.getResponseCode() != 0) { - throw new IOException("Could not update stats for table " + ci.getFullTableName() + - (ci.partName == null ? "" : "/" + ci.partName) + " due to: " + cpr); - } - } - finally { - if(localSession != null) { - localSession.close(); + sb.setLength(sb.length() - 1); //remove trailing , + LOG.info(ci + ": running '" + sb.toString() + "'"); + Driver d = new Driver(new QueryState.Builder().withGenerateNewQueryId(true).withHiveConf(conf).build(), userName); + SessionState localSession = null; + try { + if (SessionState.get() == null) { + localSession = new SessionState(conf); + SessionState.start(localSession); + } + CommandProcessorResponse cpr = d.run(sb.toString()); + if (cpr.getResponseCode() != 0) { + LOG.warn(ci + ": " + sb.toString() + " failed due to: " + cpr); + } + } finally { + if (localSession != null) { + try { + localSession.close(); + } catch (IOException ex) { + LOG.warn(ci + ": localSession.close() failed due to: " + ex.getMessage(), ex); + } + } } + } catch (Throwable t) { + LOG.error(ci + ": gatherStats(" + ci.dbname + "," + ci.tableName + "," + ci.partName + + ") failed due to: " + t.getMessage(), t); } } }