Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (revision 739005) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (working copy) @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.fileSinkDesc; +import org.apache.hadoop.hive.ql.exec.FilterOperator.Counter; import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.serde.Constants; import org.apache.hadoop.hive.serde2.SerDeException; @@ -50,6 +51,14 @@ transient protected Path finalPath; transient protected Serializer serializer; transient protected BytesWritable commonKey = new BytesWritable(); + transient protected TableIdEnum tabIdEnum = null; + transient private LongWritable row_count; + public static enum TableIdEnum { + + TABLE_ID_1_ROWCOUNT, TABLE_ID_2_ROWCOUNT, TABLE_ID_3_ROWCOUNT, TABLE_ID_4_ROWCOUNT, TABLE_ID_5_ROWCOUNT, TABLE_ID_6_ROWCOUNT, TABLE_ID_7_ROWCOUNT, TABLE_ID_8_ROWCOUNT, TABLE_ID_9_ROWCOUNT, TABLE_ID_10_ROWCOUNT, TABLE_ID_11_ROWCOUNT, TABLE_ID_12_ROWCOUNT, TABLE_ID_13_ROWCOUNT, TABLE_ID_14_ROWCOUNT, TABLE_ID_15_ROWCOUNT; + + } + private void commit() throws IOException { fs.rename(outPath, finalPath); @@ -82,6 +91,7 @@ serializer = (Serializer)conf.getTableInfo().getDeserializerClass().newInstance(); serializer.initialize(null, conf.getTableInfo().getProperties()); + JobConf jc; if(hconf instanceof JobConf) { jc = (JobConf)hconf; @@ -90,6 +100,14 @@ jc = new JobConf(hconf, ExecDriver.class); } + int id = conf.getDestTableId(); + if ((id != 0) && (id <= TableIdEnum.values().length)){ + String enumName = "TABLE_ID_"+String.valueOf(id)+"_ROWCOUNT"; + tabIdEnum = TableIdEnum.valueOf(enumName); + row_count = new LongWritable(); + statsMap.put(tabIdEnum, row_count); + + } fs = FileSystem.get(hconf); finalPath = new Path(conf.getDirName(), Utilities.getTaskId(hconf)); outPath = new Path(conf.getDirName(), "_tmp."+Utilities.getTaskId(hconf)); @@ -159,6 +177,10 @@ reporter.progress(); // user SerDe to serialize r, and write it out recordValue = serializer.serialize(row, rowInspector); + if (row_count != null){ + row_count.set(row_count.get()+ 1); + } + outWriter.write(recordValue); } catch (IOException e) { throw new HiveException (e); Index: ql/src/java/org/apache/hadoop/hive/ql/plan/fileSinkDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/fileSinkDesc.java (revision 739005) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/fileSinkDesc.java (working copy) @@ -26,18 +26,32 @@ private String dirName; private tableDesc tableInfo; private boolean compressed; + private int destTableId; + public fileSinkDesc() { } public fileSinkDesc( final String dirName, final tableDesc tableInfo, - final boolean compressed) { + final boolean compressed, int destTableId) { this.dirName = dirName; this.tableInfo = tableInfo; this.compressed = compressed; + this.destTableId = destTableId; } + public fileSinkDesc( + final String dirName, + final tableDesc tableInfo, + final boolean compressed) { + + this.dirName = dirName; + this.tableInfo = tableInfo; + this.compressed = compressed; + this.destTableId = 0; + } + @explain(displayName="directory", normalExplain=false) public String getDirName() { return this.dirName; @@ -62,4 +76,13 @@ public void setCompressed(boolean compressed) { this.compressed = compressed; } + + @explain(displayName="GlobalTableId") + public int getDestTableId() { + return destTableId; + } + + public void setDestTableId(int destTableId) { + this.destTableId = destTableId; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java (revision 739005) +++ ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java (working copy) @@ -53,6 +53,8 @@ private LogHelper console; + private Map idToTableMap = null; + // Job Hash Map private HashMap queryInfoMap = new HashMap(); @@ -67,14 +69,18 @@ public static enum Keys { SESSION_ID, QUERY_ID, TASK_ID, QUERY_RET_CODE, QUERY_NUM_TASKS, QUERY_STRING, TIME, - TASK_RET_CODE, TASK_NAME, TASK_HADOOP_ID, TASK_HADOOP_PROGRESS, TASK_COUNTERS, TASK_NUM_REDUCERS + TASK_RET_CODE, TASK_NAME, TASK_HADOOP_ID, TASK_HADOOP_PROGRESS, TASK_COUNTERS, TASK_NUM_REDUCERS, TASK_ROWS_INSERTED }; private static final String KEY = "(\\w+)"; private static final String VALUE = "[[^\"]?]+"; // anything but a " in "" + private static final String ROW_COUNT_PATTERN = "TABLE_ID_(\\d+)_ROWCOUNT"; private static final Pattern pattern = Pattern.compile(KEY + "=" + "\"" + VALUE + "\""); + + private static final Pattern rowCountPattern = Pattern.compile(ROW_COUNT_PATTERN); + // temp buffer for parsed dataa private static Map parseBuffer = new HashMap(); @@ -190,7 +196,7 @@ } Random randGen = new Random(); histFileName = conf_file_loc + "/hive_job_log_" + ss.getSessionId() - +"_" + randGen.nextInt() + ".txt"; + +"_" + Math.abs(randGen.nextInt()) + ".txt"; console.printInfo("Hive history file=" + histFileName); histStream = new PrintWriter(histFileName); @@ -299,6 +305,7 @@ */ public void setTaskCounters(String queryId, String taskId, RunningJob rj) { String id = queryId + ":" + taskId; + StringBuilder sb1 = new StringBuilder(""); TaskInfo ti = taskInfoMap.get(id); if (ti == null) return; @@ -318,13 +325,28 @@ sb.append(counter.getDisplayName()); sb.append(':'); sb.append(counter.getCounter()); + String tab = getRowCountTableName(counter.getDisplayName()); + if (tab != null){ + if (sb1.length() > 0) + sb1.append(","); + sb1.append(tab); + sb1.append(':'); + sb1.append(counter.getCounter()); + + + } } } + + } catch (Exception e) { e.printStackTrace(); } - taskInfoMap.get(id).hm.put(Keys.TASK_COUNTERS.name(), sb.toString()); + if (sb1.length()>0) + taskInfoMap.get(id).hm.put(Keys.TASK_ROWS_INSERTED.name(), sb1.toString()); + if (sb.length() > 0) + taskInfoMap.get(id).hm.put(Keys.TASK_COUNTERS.name(), sb.toString()); } /** @@ -391,5 +413,31 @@ log(RecordTypes.TaskProgress, ti.hm); } + + /** + * Set the table to id map + * @param map + */ + public void setIdToTableMap(Map map){ + idToTableMap = map; + } + /** + * Returns table name for the counter name + * @param name + * @return tableName + */ + String getRowCountTableName(String name){ + if (idToTableMap == null) return null; + Matcher m = rowCountPattern.matcher(name); + + if (m.find()){ + String tuple = m.group(1); + return idToTableMap.get(tuple); + } + return null; + + } + + } Index: ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (revision 739005) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (working copy) @@ -46,7 +46,9 @@ protected final LogHelper console; protected Context ctx; - + protected HashMap idToTableNameMap; + + public BaseSemanticAnalyzer(HiveConf conf) throws SemanticException { try { this.conf = conf; @@ -59,11 +61,18 @@ Random rand = new Random(); this.randomid = Math.abs(rand.nextInt()%rand.nextInt()); this.pathid = 10000; + this.idToTableNameMap = new HashMap(); } catch (Exception e) { throw new SemanticException (e); } } + + public HashMap getIdToTableNameMap() { + return idToTableNameMap; + } + + public abstract void analyzeInternal(ASTNode ast, Context ctx) throws SemanticException; public void analyze(ASTNode ast, Context ctx) throws SemanticException { Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 739005) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -80,6 +80,7 @@ private List loadFileWork; private QB qb; private ASTNode ast; + private int destTableId; private static class Phase1Ctx { String dest; @@ -97,7 +98,11 @@ this.loadTableWork = new ArrayList(); this.loadFileWork = new ArrayList(); opParseCtx = new HashMap, OpParseContext>(); + this.destTableId = 1; + + } + @Override protected void reset() { @@ -107,6 +112,8 @@ this.loadFileWork.clear(); this.topOps.clear(); this.topSelOps.clear(); + this.destTableId = 1; + this.idToTableNameMap.clear(); qb = null; ast = null; } @@ -2004,14 +2011,22 @@ // create a temporary directory name and chain it to the plan String dest_path = null; tableDesc table_desc = null; + + int currentTableId = 0; Integer dest_type = qb.getMetaData().getDestTypeForAlias(dest); switch (dest_type.intValue()) { case QBMetaData.DEST_TABLE: { + + Table dest_tab = qb.getMetaData().getDestTableForAlias(dest); table_desc = Utilities.getTableDesc(dest_tab); + + this.idToTableNameMap.put( String.valueOf(this.destTableId), dest_tab.getName()); + currentTableId = this.destTableId; + this.destTableId ++; dest_path = dest_tab.getPath().toString(); // Create the work for moving the table @@ -2026,6 +2041,10 @@ Table dest_tab = dest_part.getTable(); table_desc = Utilities.getTableDesc(dest_tab); dest_path = dest_part.getPath()[0].toString(); + this.idToTableNameMap.put( String.valueOf(this.destTableId), dest_tab.getName()); + currentTableId = this.destTableId; + this.destTableId ++; + this.loadTableWork.add(new loadTableDesc(queryTmpdir, table_desc, dest_part.getSpec())); break; } @@ -2063,7 +2082,7 @@ Operator output = putOpInsertMap( OperatorFactory.getAndMakeChild( new fileSinkDesc(queryTmpdir, table_desc, - conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT)), + conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT), currentTableId), new RowSchema(inputRR.getColumnInfos()), input), inputRR); LOG.debug("Created FileSink Plan for clause: " + dest + "dest_path: " Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 739005) +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy) @@ -197,10 +197,11 @@ if (jobs > 0) { console.printInfo("Total MapReduce jobs = " + jobs); } - if (SessionState.get() != null) + if (SessionState.get() != null){ SessionState.get().getHiveHistory().setQueryProperty(queryId, Keys.QUERY_NUM_TASKS, String.valueOf(jobs)); - + SessionState.get().getHiveHistory().setIdToTableMap(sem.getIdToTableNameMap()); + } String jobname = Utilities.abbreviate(command, maxlen - 6); int curJob = 0; for (Task rootTask : sem.getRootTasks()) {