diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 1672453..c276092 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -379,6 +379,9 @@ // whether session is running in silent mode or not HIVESESSIONSILENT("hive.session.silent", false), + // Whether to enable history for this session + HIVE_SESSION_HISTORY_ENABLED("hive.session.history.enabled", false), + // query being executed (multiple per session) HIVEQUERYSTRING("hive.query.string", ""), diff --git a/conf/hive-default.xml.template b/conf/hive-default.xml.template index 3a7d1dc..1f38bbb 100644 --- a/conf/hive-default.xml.template +++ b/conf/hive-default.xml.template @@ -441,6 +441,12 @@ + hive.session.history.enabled + false + Whether to log hive query, query plan, runtime statistics etc + + + hive.map.aggr.hash.min.reduction 0.5 Hash aggregation will be turned off if the ratio between hash diff --git a/data/conf/hive-site.xml b/data/conf/hive-site.xml index 544ba35..407c8f6 100644 --- a/data/conf/hive-site.xml +++ b/data/conf/hive-site.xml @@ -53,6 +53,12 @@ + hive.session.history.enabled + true + Whether to log hive query, query plan, runtime statistics etc + + + javax.jdo.option.ConnectionURL jdbc:derby:;databaseName=../build/test/junit_metastore_db;create=true diff --git a/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java b/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java index e1c1ae3..3986f89 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java @@ -18,55 +18,21 @@ package org.apache.hadoop.hive.ql.history; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InputStreamReader; -import java.io.PrintWriter; import java.io.Serializable; import java.util.HashMap; import java.util.Map; -import java.util.Random; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapred.Counters; -import org.apache.hadoop.mapred.Counters.Counter; -import org.apache.hadoop.mapred.Counters.Group; /** - * HiveHistory. - * + * HiveHistory. Logs information such as query, query plan, runtime statistics + * for into a file. + * Each session uses a new object, which creates a new file. */ -public class HiveHistory { - - PrintWriter histStream; // History File stream - - String histFileName; // History file name - - private static final Log LOG = LogFactory.getLog("hive.ql.exec.HiveHistory"); - - private LogHelper console; - - private Map idToTableMap = null; - - // Job Hash Map - private final HashMap queryInfoMap = new HashMap(); - - // Task Hash Map - private final HashMap taskInfoMap = new HashMap(); - - private static final String DELIMITER = " "; +public interface HiveHistory { /** * RecordTypes. @@ -105,18 +71,6 @@ ROWS_INSERTED }; - private static final String KEY = "(\\w+)"; - private static final String VALUE = "[[^\"]?]+"; // anything but a " in "" - private static final String ROW_COUNT_PATTERN = "TABLE_ID_(\\d+)_ROWCOUNT"; - - private static final Pattern pattern = Pattern.compile(KEY + "=" + "\"" - + VALUE + "\""); - - private static final Pattern rowCountPattern = Pattern.compile(ROW_COUNT_PATTERN); - - // temp buffer for parsed dataa - private static Map parseBuffer = new HashMap(); - /** * Listner interface Parser will call handle function for each record type. */ @@ -126,63 +80,6 @@ } /** - * Parses history file and calls call back functions. - * - * @param path - * @param l - * @throws IOException - */ - public static void parseHiveHistory(String path, Listener l) throws IOException { - FileInputStream fi = new FileInputStream(path); - BufferedReader reader = new BufferedReader(new InputStreamReader(fi)); - try { - String line = null; - StringBuilder buf = new StringBuilder(); - while ((line = reader.readLine()) != null) { - buf.append(line); - // if it does not end with " then it is line continuation - if (!line.trim().endsWith("\"")) { - continue; - } - parseLine(buf.toString(), l); - buf = new StringBuilder(); - } - } finally { - try { - reader.close(); - } catch (IOException ex) { - } - } - } - - /** - * Parse a single line of history. - * - * @param line - * @param l - * @throws IOException - */ - private static void parseLine(String line, Listener l) throws IOException { - // extract the record type - int idx = line.indexOf(' '); - String recType = line.substring(0, idx); - String data = line.substring(idx + 1, line.length()); - - Matcher matcher = pattern.matcher(data); - - while (matcher.find()) { - String tuple = matcher.group(0); - String[] parts = tuple.split("="); - - parseBuffer.put(parts[0], parts[1].substring(1, parts[1].length() - 1)); - } - - l.handle(RecordTypes.valueOf(recType), parseBuffer); - - parseBuffer.clear(); - } - - /** * Info. * */ @@ -217,106 +114,14 @@ private static void parseLine(String line, Listener l) throws IOException { }; /** - * Construct HiveHistory object an open history log file. - * - * @param ss - */ - public HiveHistory(SessionState ss) { - - try { - console = new LogHelper(LOG); - String conf_file_loc = ss.getConf().getVar( - HiveConf.ConfVars.HIVEHISTORYFILELOC); - if ((conf_file_loc == null) || conf_file_loc.length() == 0) { - console.printError("No history file location given"); - return; - } - - // Create directory - File f = new File(conf_file_loc); - if (!f.exists()) { - if (!f.mkdirs()) { - console.printError("Unable to create log directory " + conf_file_loc); - return; - } - } - Random randGen = new Random(); - do { - histFileName = conf_file_loc + "/hive_job_log_" + ss.getSessionId() + "_" - + Math.abs(randGen.nextInt()) + ".txt"; - } while (new File(histFileName).exists()); - console.printInfo("Hive history file=" + histFileName); - histStream = new PrintWriter(histFileName); - - HashMap hm = new HashMap(); - hm.put(Keys.SESSION_ID.name(), ss.getSessionId()); - log(RecordTypes.SessionStart, hm); - } catch (FileNotFoundException e) { - console.printError("FAILED: Failed to open Query Log : " + histFileName - + " " + e.getMessage(), "\n" - + org.apache.hadoop.util.StringUtils.stringifyException(e)); - } - - } - - /** * @return historyFileName */ - public String getHistFileName() { - return histFileName; - } - - /** - * Write the a history record to history file. - * - * @param rt - * @param keyValMap - */ - void log(RecordTypes rt, Map keyValMap) { - - if (histStream == null) { - return; - } - - StringBuilder sb = new StringBuilder(); - sb.append(rt.name()); - - for (Map.Entry ent : keyValMap.entrySet()) { - - sb.append(DELIMITER); - String key = ent.getKey(); - String val = ent.getValue(); - if(val != null) { - val = val.replace('\n', ' '); - } - sb.append(key + "=\"" + val + "\""); - - } - sb.append(DELIMITER); - sb.append(Keys.TIME.name() + "=\"" + System.currentTimeMillis() + "\""); - histStream.println(sb); - histStream.flush(); - - } + public String getHistFileName(); /** * Called at the start of job Driver.execute(). */ - public void startQuery(String cmd, String id) { - SessionState ss = SessionState.get(); - if (ss == null) { - return; - } - QueryInfo ji = new QueryInfo(); - - ji.hm.put(Keys.QUERY_ID.name(), id); - ji.hm.put(Keys.QUERY_STRING.name(), cmd); - - queryInfoMap.put(id, ji); - - log(RecordTypes.QueryStart, ji.hm); - - } + public void startQuery(String cmd, String id); /** * Used to set job status and other attributes of a job. @@ -325,13 +130,7 @@ public void startQuery(String cmd, String id) { * @param propName * @param propValue */ - public void setQueryProperty(String queryId, Keys propName, String propValue) { - QueryInfo ji = queryInfoMap.get(queryId); - if (ji == null) { - return; - } - ji.hm.put(propName.name(), propValue); - } + public void setQueryProperty(String queryId, Keys propName, String propValue); /** * Used to set task properties. @@ -341,14 +140,7 @@ public void setQueryProperty(String queryId, Keys propName, String propValue) { * @param propValue */ public void setTaskProperty(String queryId, String taskId, Keys propName, - String propValue) { - String id = queryId + ":" + taskId; - TaskInfo ti = taskInfoMap.get(id); - if (ti == null) { - return; - } - ti.hm.put(propName.name(), propValue); - } + String propValue); /** * Serialize the task counters and set as a task property. @@ -357,80 +149,16 @@ public void setTaskProperty(String queryId, String taskId, Keys propName, * @param taskId * @param ctrs */ - public void setTaskCounters(String queryId, String taskId, Counters ctrs) { - String id = queryId + ":" + taskId; - QueryInfo ji = queryInfoMap.get(queryId); - StringBuilder sb1 = new StringBuilder(""); - TaskInfo ti = taskInfoMap.get(id); - if ((ti == null) || (ctrs == null)) { - return; - } - StringBuilder sb = new StringBuilder(""); - try { - - boolean first = true; - for (Group group : ctrs) { - for (Counter counter : group) { - if (first) { - first = false; - } else { - sb.append(','); - } - sb.append(group.getDisplayName()); - sb.append('.'); - sb.append(counter.getDisplayName()); - sb.append(':'); - sb.append(counter.getCounter()); - String tab = getRowCountTableName(counter.getDisplayName()); - if (tab != null) { - if (sb1.length() > 0) { - sb1.append(","); - } - sb1.append(tab); - sb1.append('~'); - sb1.append(counter.getCounter()); - ji.rowCountMap.put(tab, counter.getCounter()); - - } - } - } + public void setTaskCounters(String queryId, String taskId, Counters ctrs); - } catch (Exception e) { - LOG.warn(org.apache.hadoop.util.StringUtils.stringifyException(e)); - } - if (sb1.length() > 0) { - taskInfoMap.get(id).hm.put(Keys.ROWS_INSERTED.name(), sb1.toString()); - queryInfoMap.get(queryId).hm.put(Keys.ROWS_INSERTED.name(), sb1 - .toString()); - } - if (sb.length() > 0) { - taskInfoMap.get(id).hm.put(Keys.TASK_COUNTERS.name(), sb.toString()); - } - } - - public void printRowCount(String queryId) { - QueryInfo ji = queryInfoMap.get(queryId); - if (ji == null) { - return; - } - for (String tab : ji.rowCountMap.keySet()) { - console.printInfo(ji.rowCountMap.get(tab) + " Rows loaded to " + tab); - } - } + public void printRowCount(String queryId); /** * Called at the end of Job. A Job is sql query. * * @param queryId */ - public void endQuery(String queryId) { - QueryInfo ji = queryInfoMap.get(queryId); - if (ji == null) { - return; - } - log(RecordTypes.QueryEnd, ji.hm); - queryInfoMap.remove(queryId); - } + public void endQuery(String queryId); /** * Called at the start of a task. Called by Driver.run() A Job can have @@ -439,108 +167,32 @@ public void endQuery(String queryId) { * @param task */ public void startTask(String queryId, Task task, - String taskName) { - SessionState ss = SessionState.get(); - if (ss == null) { - return; - } - TaskInfo ti = new TaskInfo(); - - ti.hm.put(Keys.QUERY_ID.name(), ss.getQueryId()); - ti.hm.put(Keys.TASK_ID.name(), task.getId()); - ti.hm.put(Keys.TASK_NAME.name(), taskName); - - String id = queryId + ":" + task.getId(); - taskInfoMap.put(id, ti); - - log(RecordTypes.TaskStart, ti.hm); - - } + String taskName); /** * Called at the end of a task. * * @param task */ - public void endTask(String queryId, Task task) { - String id = queryId + ":" + task.getId(); - TaskInfo ti = taskInfoMap.get(id); - - if (ti == null) { - return; - } - log(RecordTypes.TaskEnd, ti.hm); - taskInfoMap.remove(id); - } + public void endTask(String queryId, Task task); /** * Called at the end of a task. * * @param task */ - public void progressTask(String queryId, Task task) { - String id = queryId + ":" + task.getId(); - TaskInfo ti = taskInfoMap.get(id); - if (ti == null) { - return; - } - log(RecordTypes.TaskProgress, ti.hm); - - } - - /** - * write out counters. - */ - static ThreadLocal> ctrMapFactory = - new ThreadLocal>() { - @Override - protected Map initialValue() { - return new HashMap(); - } - }; - - public void logPlanProgress(QueryPlan plan) throws IOException { - Map ctrmap = ctrMapFactory.get(); - ctrmap.put("plan", plan.toString()); - log(RecordTypes.Counters, ctrmap); - } + public void progressTask(String queryId, Task task); + public void logPlanProgress(QueryPlan plan) throws IOException; /** * Set the table to id map. * * @param map */ - public void setIdToTableMap(Map map) { - idToTableMap = map; - } - - /** - * Returns table name for the counter name. - * - * @param name - * @return tableName - */ - String getRowCountTableName(String name) { - if (idToTableMap == null) { - return null; - } - Matcher m = rowCountPattern.matcher(name); + public void setIdToTableMap(Map map); - if (m.find()) { - String tuple = m.group(1); - return idToTableMap.get(tuple); - } - return null; + public void closeStream(); - } - public void closeStream() { - IOUtils.cleanup(LOG, histStream); - } - @Override - public void finalize() throws Throwable { - closeStream(); - super.finalize(); - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java new file mode 100644 index 0000000..355dc88 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java @@ -0,0 +1,405 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.history; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.QueryPlan; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.Counters.Counter; +import org.apache.hadoop.mapred.Counters.Group; + +/** + * HiveHistory. Logs information such as query, query plan, runtime statistics + * for into a file. + * Each session uses a new object, which creates a new file. + */ +public class HiveHistoryImpl implements HiveHistory{ + + PrintWriter histStream; // History File stream + + String histFileName; // History file name + + private static final Log LOG = LogFactory.getLog("hive.ql.exec.HiveHistory"); + + private LogHelper console; + + private Map idToTableMap = null; + + // Job Hash Map + private final HashMap queryInfoMap = new HashMap(); + + // Task Hash Map + private final HashMap taskInfoMap = new HashMap(); + + private static final String DELIMITER = " "; + + private static final String ROW_COUNT_PATTERN = "TABLE_ID_(\\d+)_ROWCOUNT"; + + private static final Pattern rowCountPattern = Pattern.compile(ROW_COUNT_PATTERN); + + /** + * Construct HiveHistory object an open history log file. + * + * @param ss + */ + public HiveHistoryImpl(SessionState ss) { + + try { + console = new LogHelper(LOG); + String conf_file_loc = ss.getConf().getVar( + HiveConf.ConfVars.HIVEHISTORYFILELOC); + if ((conf_file_loc == null) || conf_file_loc.length() == 0) { + console.printError("No history file location given"); + return; + } + + // Create directory + File f = new File(conf_file_loc); + if (!f.exists()) { + if (!f.mkdirs()) { + console.printError("Unable to create log directory " + conf_file_loc); + return; + } + } + Random randGen = new Random(); + do { + histFileName = conf_file_loc + "/hive_job_log_" + ss.getSessionId() + "_" + + Math.abs(randGen.nextInt()) + ".txt"; + } while (new File(histFileName).exists()); + console.printInfo("Hive history file=" + histFileName); + histStream = new PrintWriter(histFileName); + + HashMap hm = new HashMap(); + hm.put(Keys.SESSION_ID.name(), ss.getSessionId()); + log(RecordTypes.SessionStart, hm); + } catch (FileNotFoundException e) { + console.printError("FAILED: Failed to open Query Log : " + histFileName + + " " + e.getMessage(), "\n" + + org.apache.hadoop.util.StringUtils.stringifyException(e)); + } + + } + + /** + * @return historyFileName + */ + public String getHistFileName() { + return histFileName; + } + + /** + * Write the a history record to history file. + * + * @param rt + * @param keyValMap + */ + void log(RecordTypes rt, Map keyValMap) { + + if (histStream == null) { + return; + } + + StringBuilder sb = new StringBuilder(); + sb.append(rt.name()); + + for (Map.Entry ent : keyValMap.entrySet()) { + + sb.append(DELIMITER); + String key = ent.getKey(); + String val = ent.getValue(); + if(val != null) { + val = val.replace('\n', ' '); + } + sb.append(key + "=\"" + val + "\""); + + } + sb.append(DELIMITER); + sb.append(Keys.TIME.name() + "=\"" + System.currentTimeMillis() + "\""); + histStream.println(sb); + histStream.flush(); + + } + + /** + * Called at the start of job Driver.execute(). + */ + public void startQuery(String cmd, String id) { + SessionState ss = SessionState.get(); + if (ss == null) { + return; + } + QueryInfo ji = new QueryInfo(); + + ji.hm.put(Keys.QUERY_ID.name(), id); + ji.hm.put(Keys.QUERY_STRING.name(), cmd); + + queryInfoMap.put(id, ji); + + log(RecordTypes.QueryStart, ji.hm); + + } + + /** + * Used to set job status and other attributes of a job. + * + * @param queryId + * @param propName + * @param propValue + */ + @Override + public void setQueryProperty(String queryId, Keys propName, String propValue) { + QueryInfo ji = queryInfoMap.get(queryId); + if (ji == null) { + return; + } + ji.hm.put(propName.name(), propValue); + } + + /** + * Used to set task properties. + * + * @param taskId + * @param propName + * @param propValue + */ + @Override + public void setTaskProperty(String queryId, String taskId, Keys propName, + String propValue) { + String id = queryId + ":" + taskId; + TaskInfo ti = taskInfoMap.get(id); + if (ti == null) { + return; + } + ti.hm.put(propName.name(), propValue); + } + + /** + * Serialize the task counters and set as a task property. + * + * @param queryId + * @param taskId + * @param ctrs + */ + public void setTaskCounters(String queryId, String taskId, Counters ctrs) { + String id = queryId + ":" + taskId; + QueryInfo ji = queryInfoMap.get(queryId); + StringBuilder sb1 = new StringBuilder(""); + TaskInfo ti = taskInfoMap.get(id); + if ((ti == null) || (ctrs == null)) { + return; + } + StringBuilder sb = new StringBuilder(""); + try { + + boolean first = true; + for (Group group : ctrs) { + for (Counter counter : group) { + if (first) { + first = false; + } else { + sb.append(','); + } + sb.append(group.getDisplayName()); + sb.append('.'); + sb.append(counter.getDisplayName()); + sb.append(':'); + sb.append(counter.getCounter()); + String tab = getRowCountTableName(counter.getDisplayName()); + if (tab != null) { + if (sb1.length() > 0) { + sb1.append(","); + } + sb1.append(tab); + sb1.append('~'); + sb1.append(counter.getCounter()); + ji.rowCountMap.put(tab, counter.getCounter()); + + } + } + } + + } catch (Exception e) { + LOG.warn(org.apache.hadoop.util.StringUtils.stringifyException(e)); + } + if (sb1.length() > 0) { + taskInfoMap.get(id).hm.put(Keys.ROWS_INSERTED.name(), sb1.toString()); + queryInfoMap.get(queryId).hm.put(Keys.ROWS_INSERTED.name(), sb1 + .toString()); + } + if (sb.length() > 0) { + taskInfoMap.get(id).hm.put(Keys.TASK_COUNTERS.name(), sb.toString()); + } + } + + public void printRowCount(String queryId) { + QueryInfo ji = queryInfoMap.get(queryId); + if (ji == null) { + return; + } + for (String tab : ji.rowCountMap.keySet()) { + console.printInfo(ji.rowCountMap.get(tab) + " Rows loaded to " + tab); + } + } + + /** + * Called at the end of Job. A Job is sql query. + * + * @param queryId + */ + public void endQuery(String queryId) { + QueryInfo ji = queryInfoMap.get(queryId); + if (ji == null) { + return; + } + log(RecordTypes.QueryEnd, ji.hm); + queryInfoMap.remove(queryId); + } + + /** + * Called at the start of a task. Called by Driver.run() A Job can have + * multiple tasks. Tasks will have multiple operator. + * + * @param task + */ + public void startTask(String queryId, Task task, + String taskName) { + SessionState ss = SessionState.get(); + if (ss == null) { + return; + } + TaskInfo ti = new TaskInfo(); + + ti.hm.put(Keys.QUERY_ID.name(), ss.getQueryId()); + ti.hm.put(Keys.TASK_ID.name(), task.getId()); + ti.hm.put(Keys.TASK_NAME.name(), taskName); + + String id = queryId + ":" + task.getId(); + taskInfoMap.put(id, ti); + + log(RecordTypes.TaskStart, ti.hm); + + } + + /** + * Called at the end of a task. + * + * @param task + */ + public void endTask(String queryId, Task task) { + String id = queryId + ":" + task.getId(); + TaskInfo ti = taskInfoMap.get(id); + + if (ti == null) { + return; + } + log(RecordTypes.TaskEnd, ti.hm); + taskInfoMap.remove(id); + } + + /** + * Called at the end of a task. + * + * @param task + */ + public void progressTask(String queryId, Task task) { + String id = queryId + ":" + task.getId(); + TaskInfo ti = taskInfoMap.get(id); + if (ti == null) { + return; + } + log(RecordTypes.TaskProgress, ti.hm); + + } + + /** + * write out counters. + */ + static ThreadLocal> ctrMapFactory = + new ThreadLocal>() { + @Override + protected Map initialValue() { + return new HashMap(); + } + }; + + public void logPlanProgress(QueryPlan plan) throws IOException { + Map ctrmap = ctrMapFactory.get(); + ctrmap.put("plan", plan.toString()); + log(RecordTypes.Counters, ctrmap); + } + + /** + * Set the table to id map. + * + * @param map + */ + public void setIdToTableMap(Map map) { + idToTableMap = map; + } + + /** + * Returns table name for the counter name. + * + * @param name + * @return tableName + */ + String getRowCountTableName(String name) { + if (idToTableMap == null) { + return null; + } + Matcher m = rowCountPattern.matcher(name); + + if (m.find()) { + String tuple = m.group(1); + return idToTableMap.get(tuple); + } + return null; + + } + + public void closeStream() { + IOUtils.cleanup(LOG, histStream); + } + + @Override + public void finalize() throws Throwable { + closeStream(); + super.finalize(); + } + + + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryProxyHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryProxyHandler.java new file mode 100644 index 0000000..c9b98bb --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryProxyHandler.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.history; + +/** + * Proxy handler for HiveHistory to do nothing if + * HiveHistory is disabled. + */ +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; + +public class HiveHistoryProxyHandler implements InvocationHandler { + + public static HiveHistory getNoOpHiveHistoryProxy() { + return (HiveHistory)Proxy.newProxyInstance(HiveHistory.class.getClassLoader(), + new Class[] {HiveHistory.class}, + new HiveHistoryProxyHandler()); + } + + @Override + public Object invoke(Object arg0, final Method method, final Object[] args){ + return null; + } + +} + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryUtil.java new file mode 100644 index 0000000..ae35845 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryUtil.java @@ -0,0 +1,83 @@ +package org.apache.hadoop.hive.ql.history; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.hive.ql.history.HiveHistory.Listener; +import org.apache.hadoop.hive.ql.history.HiveHistory.RecordTypes; + +public class HiveHistoryUtil { + /** + * Parses history file and calls call back functions. Also look at + * HiveHistoryViewer + * + * @param path + * @param l + * @throws IOException + */ + public static void parseHiveHistory(String path, Listener l) throws IOException { + FileInputStream fi = new FileInputStream(path); + BufferedReader reader = new BufferedReader(new InputStreamReader(fi)); + try { + String line = null; + StringBuilder buf = new StringBuilder(); + while ((line = reader.readLine()) != null) { + buf.append(line); + // if it does not end with " then it is line continuation + if (!line.trim().endsWith("\"")) { + continue; + } + parseLine(buf.toString(), l); + buf = new StringBuilder(); + } + } finally { + try { + reader.close(); + } catch (IOException ex) { + } + } + } + + + private static final String KEY = "(\\w+)"; + private static final String VALUE = "[[^\"]?]+"; // anything but a " in "" + private static final Pattern pattern = Pattern.compile(KEY + "=" + "\"" + + VALUE + "\""); + + // temp buffer for parsed dataa + private static Map parseBuffer = new HashMap(); + + /** + * Parse a single line of history. + * + * @param line + * @param l + * @throws IOException + */ + private static void parseLine(String line, Listener l) throws IOException { + // extract the record type + int idx = line.indexOf(' '); + String recType = line.substring(0, idx); + String data = line.substring(idx + 1, line.length()); + + Matcher matcher = pattern.matcher(data); + + while (matcher.find()) { + String tuple = matcher.group(0); + String[] parts = tuple.split("="); + + parseBuffer.put(parts[0], parts[1].substring(1, parts[1].length() - 1)); + } + + l.handle(RecordTypes.valueOf(recType), parseBuffer); + + parseBuffer.clear(); + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryViewer.java b/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryViewer.java index fdd56db..61fc236 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryViewer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryViewer.java @@ -67,7 +67,7 @@ public String getSessionId() { void init() { try { - HiveHistory.parseHiveHistory(historyFile, this); + HiveHistoryUtil.parseHiveHistory(historyFile, this); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); @@ -77,7 +77,7 @@ void init() { /** * Implementation Listner interface function. - * + * * @see org.apache.hadoop.hive.ql.history.HiveHistory.Listener#handle(org.apache.hadoop.hive.ql.history.HiveHistory.RecordTypes, * java.util.Map) */ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 3d43451..8427d96 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -42,6 +42,8 @@ import org.apache.hadoop.hive.ql.MapRedStats; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.history.HiveHistory; +import org.apache.hadoop.hive.ql.history.HiveHistoryImpl; +import org.apache.hadoop.hive.ql.history.HiveHistoryProxyHandler; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.ql.plan.HiveOperation; @@ -248,19 +250,19 @@ public static SessionState start(SessionState startSs) { .setVar(HiveConf.ConfVars.HIVESESSIONID, makeSessionId()); } - if (startSs.hiveHist == null) { - startSs.hiveHist = new HiveHistory(startSs); + if(startSs.hiveHist == null){ + if (startSs.getConf().getBoolVar(HiveConf.ConfVars.HIVE_SESSION_HISTORY_ENABLED)) { + startSs.hiveHist = new HiveHistoryImpl(startSs); + }else { + //Hive history is disabled, create a no-op proxy + startSs.hiveHist = HiveHistoryProxyHandler.getNoOpHiveHistoryProxy(); + } } if (startSs.getTmpOutputFile() == null) { - // per-session temp file containing results to be sent from HiveServer to HiveClient - File tmpDir = new File( - HiveConf.getVar(startSs.getConf(), HiveConf.ConfVars.HIVEHISTORYFILELOC)); - String sessionID = startSs.getConf().getVar(HiveConf.ConfVars.HIVESESSIONID); + // set temp file containing results to be sent to HiveClient try { - File tmpFile = File.createTempFile(sessionID, ".pipeout", tmpDir); - tmpFile.deleteOnExit(); - startSs.setTmpOutputFile(tmpFile); + startSs.setTmpOutputFile(createTempFile(startSs.getConf())); } catch (IOException e) { throw new RuntimeException(e); } @@ -282,6 +284,28 @@ public static SessionState start(SessionState startSs) { } /** + * @param conf + * @return per-session temp file + * @throws IOException + */ + private static File createTempFile(HiveConf conf) throws IOException { + String hHistDir = + HiveConf.getVar(conf, HiveConf.ConfVars.HIVEHISTORYFILELOC); + + File tmpDir = new File(hHistDir); + String sessionID = conf.getVar(HiveConf.ConfVars.HIVESESSIONID); + if (!tmpDir.exists()) { + if (!tmpDir.mkdirs()) { + throw new RuntimeException("Unable to create log directory " + + hHistDir); + } + } + File tmpFile = File.createTempFile(sessionID, ".pipeout", tmpDir); + tmpFile.deleteOnExit(); + return tmpFile; + } + + /** * get the current session. */ public static SessionState get() { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/history/TestHiveHistory.java b/ql/src/test/org/apache/hadoop/hive/ql/history/TestHiveHistory.java index a783303..5b4a586 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/history/TestHiveHistory.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/history/TestHiveHistory.java @@ -20,6 +20,7 @@ import java.io.PrintStream; import java.io.UnsupportedEncodingException; +import java.lang.reflect.Proxy; import java.util.LinkedList; import java.util.Map; @@ -31,9 +32,9 @@ import org.apache.hadoop.hive.common.LogUtils; import org.apache.hadoop.hive.common.LogUtils.LogInitializationException; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.ql.Driver; -import org.apache.hadoop.hive.ql.QTestUtil.QTestSetup; import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.history.HiveHistory.QueryInfo; import org.apache.hadoop.hive.ql.history.HiveHistory.TaskInfo; @@ -75,7 +76,7 @@ protected void setUp() { + tmpdir); } } - + conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); // copy the test files into hadoop if required. @@ -179,7 +180,7 @@ public void testQueryloglocParentDirNotExist() throws Exception { HiveConf conf = new HiveConf(SessionState.class); conf.set(HiveConf.ConfVars.HIVEHISTORYFILELOC.toString(), actualDir); SessionState ss = new CliSessionState(conf); - HiveHistory hiveHistory = new HiveHistory(ss); + HiveHistory hiveHistory = new HiveHistoryImpl(ss); Path actualPath = new Path(actualDir); if (!fs.exists(actualPath)) { fail("Query location path is not exist :" + actualPath.toString()); @@ -192,4 +193,38 @@ public void testQueryloglocParentDirNotExist() throws Exception { } } + /** + * Check if HiveHistoryImpl class is returned when hive history is enabled + * @throws Exception + */ + public void testHiveHistoryConfigEnabled() throws Exception { + HiveConf conf = new HiveConf(SessionState.class); + conf.setBoolVar(ConfVars.HIVE_SESSION_HISTORY_ENABLED, true); + SessionState ss = new CliSessionState(conf); + SessionState.start(ss); + HiveHistory hHistory = ss.getHiveHistory(); + assertEquals("checking hive history class when history is enabled", + hHistory.getClass(), HiveHistoryImpl.class); + } + /** + * Check if HiveHistory class is a Proxy class when hive history is disabled + * @throws Exception + */ + public void testHiveHistoryConfigDisabled() throws Exception { + HiveConf conf = new HiveConf(SessionState.class); + conf.setBoolVar(ConfVars.HIVE_SESSION_HISTORY_ENABLED, false); + SessionState ss = new CliSessionState(conf); + SessionState.start(ss); + HiveHistory hHistory = ss.getHiveHistory(); + assertTrue("checking hive history class when history is disabled", + hHistory.getClass() != HiveHistoryImpl.class); + System.err.println("hHistory.getClass" + hHistory.getClass()); + assertTrue("verifying proxy class is used when history is disabled", + Proxy.isProxyClass(hHistory.getClass())); + + } + + + + }