diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index e971644..1dcd67d 100644
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -763,6 +763,8 @@
HIVE_SERVER2_TABLE_TYPE_MAPPING("hive.server2.table.type.mapping", "CLASSIC"),
HIVE_SERVER2_SESSION_HOOK("hive.server2.session.hook", ""),
+ HIVE_SERVER2_IN_MEM_LOGGING("hive.server2.in.mem.logging", true),
+ HIVE_SERVER2_IN_MEM_LOG_SIZE("hive.server2.in.mem.log.size", 128 * 1024),
HIVE_CONF_RESTRICTED_LIST("hive.conf.restricted.list", null),
// If this is set all move tasks at the end of a multi-insert query will only begin once all
diff --git conf/hive-default.xml.template conf/hive-default.xml.template
index 1ee756c..92d1065 100644
--- conf/hive-default.xml.template
+++ conf/hive-default.xml.template
@@ -1780,6 +1780,22 @@
+ hive.server2.in.mem.logging
+ true
+
+ Whether to turn on hiveserver2 in memory logging
+
+
+
+
+ hive.server2.in.mem.log.size
+ 131072
+
+ Maximum size of the hiveserver2 in memory query log. Note that the size is per query.
+
+
+
+
hive.decode.partition.name
false
Whether to show the unquoted partition names in query results.
diff --git jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
index 2912ece..6b9cba3 100644
--- jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
+++ jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
@@ -32,6 +32,8 @@
import org.apache.hive.service.cli.thrift.TCloseOperationResp;
import org.apache.hive.service.cli.thrift.TExecuteStatementReq;
import org.apache.hive.service.cli.thrift.TExecuteStatementResp;
+import org.apache.hive.service.cli.thrift.TGetLogReq;
+import org.apache.hive.service.cli.thrift.TGetLogResp;
import org.apache.hive.service.cli.thrift.TGetOperationStatusReq;
import org.apache.hive.service.cli.thrift.TGetOperationStatusResp;
import org.apache.hive.service.cli.thrift.TOperationHandle;
@@ -280,6 +282,25 @@ public boolean execute(String sql, String[] columnNames) throws SQLException {
throw new SQLException("Method not supported");
}
+ public String getLog() throws SQLException {
+ if (isClosed) {
+ throw new SQLException("Can't get log for statement after statement has been closed");
+ }
+
+ TGetLogReq getLogReq = new TGetLogReq();
+ TGetLogResp getLogResp;
+ getLogReq.setOperationHandle(stmtHandle);
+ try {
+ getLogResp = client.GetLog(getLogReq);
+ Utils.verifySuccessWithInfo(getLogResp.getStatus());
+ } catch (SQLException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new SQLException(e.toString(), "08S01", e);
+ }
+ return getLogResp.getLog();
+ }
+
/*
* (non-Javadoc)
*
diff --git jdbc/src/test/org/apache/hive/jdbc/TestJdbcDriver2.java jdbc/src/test/org/apache/hive/jdbc/TestJdbcDriver2.java
index 09ab3c2..be592c5 100644
--- jdbc/src/test/org/apache/hive/jdbc/TestJdbcDriver2.java
+++ jdbc/src/test/org/apache/hive/jdbc/TestJdbcDriver2.java
@@ -806,6 +806,30 @@ private void doTestErrorCase(String sql, String expectedMessage,
exceptionFound);
}
+ public void testGetLog() throws Exception {
+ HiveStatement stmt = (HiveStatement)con.createStatement();
+ assertNotNull("Statement is null", stmt);
+
+ ResultSet res = stmt.executeQuery("select count(*) from " + tableName);
+ ResultSetMetaData meta = res.getMetaData();
+
+ boolean moreRow = res.next();
+ while (moreRow) {
+ try {
+ moreRow = res.next();
+ } catch (SQLException e) {
+ throw e;
+ }
+ }
+
+ String log = stmt.getLog();
+ assertTrue("Operation Log looks incorrect" ,
+ log.contains("Parsing command: select count(*) from testHiveJdbcDriver_Table"));
+ assertTrue("Operation Log looks incorrect",
+ log.contains( "select count(*) from testHiveJdbcDriver_Table"));
+
+ }
+
public void testShowTables() throws SQLException {
Statement stmt = con.createStatement();
assertNotNull("Statement is null", stmt);
diff --git service/if/TCLIService.thrift service/if/TCLIService.thrift
index 6e20375..db50266 100644
--- service/if/TCLIService.thrift
+++ service/if/TCLIService.thrift
@@ -208,7 +208,7 @@ struct TUnionTypeEntry {
}
struct TUserDefinedTypeEntry {
- // The fully qualified name of the class implementing this type.
+ // The fully qualified name of the class implementing this type.
1: required string typeClassName
}
@@ -995,6 +995,22 @@ struct TFetchResultsResp {
3: optional TRowSet results
}
+// GetLog()
+//
+// Fetch operation log from the server corresponding to
+// a particular OperationHandle.
+struct TGetLogReq {
+ // Operation whose log is requested
+ 1: required TOperationHandle operationHandle
+}
+
+struct TGetLogResp {
+ 1: required TStatus status
+
+ 2: required string log
+
+}
+
service TCLIService {
TOpenSessionResp OpenSession(1:TOpenSessionReq req);
@@ -1028,4 +1044,6 @@ service TCLIService {
TGetResultSetMetadataResp GetResultSetMetadata(1:TGetResultSetMetadataReq req);
TFetchResultsResp FetchResults(1:TFetchResultsReq req);
+
+ TGetLogResp GetLog(1:TGetLogReq req);
}
diff --git service/src/java/org/apache/hive/service/cli/CLIService.java service/src/java/org/apache/hive/service/cli/CLIService.java
index 1a7f338..16ec330 100644
--- service/src/java/org/apache/hive/service/cli/CLIService.java
+++ service/src/java/org/apache/hive/service/cli/CLIService.java
@@ -35,6 +35,7 @@
import org.apache.hive.service.CompositeService;
import org.apache.hive.service.ServiceException;
import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.cli.log.OperationLog;
import org.apache.hive.service.cli.session.SessionManager;
/**
@@ -258,8 +259,10 @@ public OperationHandle getFunctions(SessionHandle sessionHandle,
@Override
public OperationState getOperationStatus(OperationHandle opHandle)
throws HiveSQLException {
+ startLogCapture(opHandle);
OperationState opState = sessionManager.getOperationManager().getOperationState(opHandle);
LOG.info(opHandle + ": getOperationStatus()");
+ stopLogCapture();
return opState;
}
@@ -269,9 +272,12 @@ public OperationState getOperationStatus(OperationHandle opHandle)
@Override
public void cancelOperation(OperationHandle opHandle)
throws HiveSQLException {
+ startLogCapture(opHandle);
sessionManager.getOperationManager().getOperation(opHandle).
getParentSession().cancelOperation(opHandle);
LOG.info(opHandle + ": cancelOperation()");
+ sessionManager.getLogManager().destroyOperationLog(opHandle);
+ stopLogCapture();
}
/* (non-Javadoc)
@@ -280,9 +286,12 @@ public void cancelOperation(OperationHandle opHandle)
@Override
public void closeOperation(OperationHandle opHandle)
throws HiveSQLException {
+ startLogCapture(opHandle);
sessionManager.getOperationManager().getOperation(opHandle).
getParentSession().closeOperation(opHandle);
LOG.info(opHandle + ": closeOperation");
+ sessionManager.getLogManager().destroyOperationLog(opHandle);
+ stopLogCapture();
}
/* (non-Javadoc)
@@ -291,9 +300,11 @@ public void closeOperation(OperationHandle opHandle)
@Override
public TableSchema getResultSetMetadata(OperationHandle opHandle)
throws HiveSQLException {
+ startLogCapture(opHandle);
TableSchema tableSchema = sessionManager.getOperationManager().getOperation(opHandle).
getParentSession().getResultSetMetadata(opHandle);
LOG.info(opHandle + ": getResultSetMetadata()");
+ stopLogCapture();
return tableSchema;
}
@@ -303,9 +314,11 @@ public TableSchema getResultSetMetadata(OperationHandle opHandle)
@Override
public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows)
throws HiveSQLException {
+ startLogCapture(opHandle);
RowSet rowSet = sessionManager.getOperationManager().getOperation(opHandle).
getParentSession().fetchResults(opHandle, orientation, maxRows);
LOG.info(opHandle + ": fetchResults()");
+ stopLogCapture();
return rowSet;
}
@@ -315,9 +328,11 @@ public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientatio
@Override
public RowSet fetchResults(OperationHandle opHandle)
throws HiveSQLException {
+ startLogCapture(opHandle);
RowSet rowSet = sessionManager.getOperationManager().getOperation(opHandle).
getParentSession().fetchResults(opHandle);
LOG.info(opHandle + ": fetchResults()");
+ stopLogCapture();
return rowSet;
}
@@ -341,4 +356,24 @@ public synchronized String getDelegationTokenFromMetaStore(String owner)
}
}
}
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#getLog(org.apache.hive.service.cli.OperationHandle)
+ */
+ @Override
+ public String getLog(OperationHandle opHandle)
+ throws HiveSQLException {
+ OperationLog log = sessionManager.getLogManager().getOperationLogByOperation(opHandle, false);
+ LOG.info(opHandle + ": getLog()");
+ return log.readOperationLog();
+ }
+
+ private void startLogCapture(OperationHandle operationHandle) throws HiveSQLException {
+ sessionManager.getLogManager().unregisterCurrentThread();
+ sessionManager.getLogManager().registerCurrentThread(operationHandle);
+ }
+
+ private void stopLogCapture() {
+ sessionManager.getLogManager().unregisterCurrentThread();
+ }
}
diff --git service/src/java/org/apache/hive/service/cli/CLIServiceClient.java service/src/java/org/apache/hive/service/cli/CLIServiceClient.java
index 14ef54f..8bd23bf 100644
--- service/src/java/org/apache/hive/service/cli/CLIServiceClient.java
+++ service/src/java/org/apache/hive/service/cli/CLIServiceClient.java
@@ -158,4 +158,7 @@ public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException {
return fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 1000);
}
+ @Override
+ public abstract String getLog(OperationHandle opHandle) throws HiveSQLException;
+
}
diff --git service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java
index 9dca874..4a725b1 100644
--- service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java
+++ service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java
@@ -188,4 +188,9 @@ public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientatio
return cliService.fetchResults(opHandle, orientation, maxRows);
}
+ @Override
+ public String getLog(OperationHandle opHandle)
+ throws HiveSQLException {
+ return cliService.getLog(opHandle);
+ }
}
diff --git service/src/java/org/apache/hive/service/cli/ICLIService.java service/src/java/org/apache/hive/service/cli/ICLIService.java
index f647ce6..a9ab35e 100644
--- service/src/java/org/apache/hive/service/cli/ICLIService.java
+++ service/src/java/org/apache/hive/service/cli/ICLIService.java
@@ -91,4 +91,6 @@ public abstract RowSet fetchResults(OperationHandle opHandle, FetchOrientation o
public abstract RowSet fetchResults(OperationHandle opHandle)
throws HiveSQLException;
+ public abstract String getLog(OperationHandle opHandle)
+ throws HiveSQLException;
}
diff --git service/src/java/org/apache/hive/service/cli/log/LinkedStringBuffer.java service/src/java/org/apache/hive/service/cli/log/LinkedStringBuffer.java
new file mode 100644
index 0000000..c4a5106
--- /dev/null
+++ service/src/java/org/apache/hive/service/cli/log/LinkedStringBuffer.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.log;
+
+import java.util.LinkedList;
+
+/**
+ * A linked string buffer with a capacity limit.
+ */
+public class LinkedStringBuffer {
+
+ private final LinkedList list;
+ private final long capacity;
+ private int size;
+
+ /**
+ * Create a buffer with the specified capacity on the number of characters.
+ */
+ public LinkedStringBuffer(long capacity) {
+ this.capacity = capacity;
+ list = new LinkedList();
+ }
+
+ /**
+ * @return Size (number of characters) in the buffer
+ */
+ public synchronized long size() {
+ return size;
+ }
+
+ /**
+ * Write to the buffer, which will remove previously written strings if
+ * we don't fit in capacity.
+ */
+ public synchronized void write(String data) {
+ list.add(data);
+ size += data.length();
+
+ // Trim from the front
+ while (size > capacity) {
+ String evicted = list.remove(0);
+ size -= evicted.length();
+ }
+ }
+
+ /**
+ * @return All the data in the buffer.
+ */
+ public synchronized String read() {
+ StringBuilder sb = new StringBuilder(size);
+ for (String s : list) {
+ sb.append(s);
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Remove all stored data.
+ */
+ public synchronized void clear() {
+ list.clear();
+ size = 0;
+ }
+}
diff --git service/src/java/org/apache/hive/service/cli/log/LogDivertAppender.java service/src/java/org/apache/hive/service/cli/log/LogDivertAppender.java
new file mode 100644
index 0000000..5629881
--- /dev/null
+++ service/src/java/org/apache/hive/service/cli/log/LogDivertAppender.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.log;
+
+import java.io.CharArrayWriter;
+
+import org.apache.log4j.Layout;
+import org.apache.log4j.Logger;
+import org.apache.log4j.WriterAppender;
+import org.apache.log4j.spi.Filter;
+import org.apache.log4j.spi.LoggingEvent;
+
+/**
+ * An Appender to divert logs from individual threads to the LogObject they belong to.
+ */
+public class LogDivertAppender extends WriterAppender {
+ private static final Logger LOG = Logger.getLogger(LogDivertAppender.class.getName());
+ private final LogManager logManager;
+
+ /**
+ * A log filter that exclude messages coming from the logger with the given name.
+ * We apply this filter on the Loggers used by the log diversion stuff, so that
+ * they don't generate more logs for themselves when they process logs.
+ */
+ private class NameExclusionFilter extends Filter {
+ private String excludeLoggerName = null;
+
+ public NameExclusionFilter(String excludeLoggerName) {
+ super();
+ this.excludeLoggerName = excludeLoggerName;
+ }
+
+ @Override
+ public int decide(LoggingEvent ev) {
+ if (ev.getLoggerName().equals(excludeLoggerName)) {
+ return Filter.DENY;
+ }
+ return Filter.NEUTRAL;
+ }
+ }
+
+ /** This is where the log message will go to */
+ private final CharArrayWriter writer = new CharArrayWriter();
+
+ public LogDivertAppender(Layout layout, LogManager logManager) {
+ super();
+ this.setLayout(layout);
+ this.setWriter(writer);
+ this.setName("LogDivertAppender");
+ this.logManager = logManager;
+
+ // Filter out messages coming from log processing classes, or we'll run an infinite loop.
+ this.addFilter(new NameExclusionFilter(LOG.getName()));
+ this.addFilter(new NameExclusionFilter(OperationLog.class.getName()));
+ this.addFilter(new NameExclusionFilter(LogManager.class.getName()));
+ }
+
+ /**
+ * Overrides WriterAppender.subAppend(), which does the real logging.
+ * No need to worry about concurrency since log4j calls this synchronously.
+ */
+ @Override
+ protected void subAppend(LoggingEvent event) {
+ super.subAppend(event);
+ // That should've gone into our writer. Notify the LogContext.
+ String logOutput = writer.toString();
+ writer.reset();
+
+ OperationLog log = logManager.getOperationLogByThreadName(event.getThreadName());
+ if (log == null) {
+ LOG.debug(" ---+++=== Dropped log event from thread " + event.getThreadName());
+ return;
+ }
+ log.writeOperationLog(logOutput);
+ }
+}
diff --git service/src/java/org/apache/hive/service/cli/log/LogManager.java service/src/java/org/apache/hive/service/cli/log/LogManager.java
new file mode 100644
index 0000000..8ed7541
--- /dev/null
+++ service/src/java/org/apache/hive/service/cli/log/LogManager.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.log;
+
+
+import java.util.Enumeration;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.service.AbstractService;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.OperationHandle;
+import org.apache.hive.service.cli.session.SessionManager;
+import org.apache.log4j.Appender;
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Layout;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+
+/**
+ * LogManager - LogManager is responsible for managing the lifecycle of in memory operation logs for HS2.
+ * Each log object is maintained as a rolling log whose size can't exceed 1MB.
+ * LogManager tracks the log objects by operation handle as well as by the thread whose output will
+ * be redirected to these log objects.
+ */
+public class LogManager extends AbstractService {
+ private HiveConf hiveConf;
+
+ private final Map OperationLogByOperation =
+ new ConcurrentHashMap ();
+ private final Map OperationLogByThreadName =
+ new ConcurrentHashMap ();
+
+ private boolean isOperationLogCaptureIntialized = false;
+
+ private static final String DEFAULT_LAYOUT_PATTERN = "%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n";
+
+ private static Logger LOG = Logger.getLogger(LogManager.class.getName());
+ private SessionManager sessionManager;
+
+ public LogManager() {
+ super("LogManager");
+ }
+
+ public void setSessionManager(SessionManager sessionManager) {
+ this.sessionManager = sessionManager;
+ }
+
+ public SessionManager getSessionManager() {
+ return sessionManager;
+ }
+
+ public void initOperationLogCapture() {
+ if (isOperationLogCaptureIntialized) {
+ return;
+ }
+
+ // There should be a ConsoleAppender. Copy its Layout.
+ Logger root = Logger.getRootLogger();
+ Layout layout = null;
+
+ Enumeration> appenders = root.getAllAppenders();
+ while (appenders.hasMoreElements()) {
+ Appender ap = (Appender) appenders.nextElement();
+ if (ap.getClass().equals(ConsoleAppender.class)) {
+ layout = ap.getLayout();
+ break;
+ }
+ }
+
+ if (layout == null) {
+ layout = new PatternLayout(DEFAULT_LAYOUT_PATTERN);
+ LOG.info("Cannot find a Layout from a ConsoleAppender. Using default Layout pattern.");
+ }
+
+ // Register another Appender (with the same layout) that talks to us.
+ Appender ap = new LogDivertAppender(layout, this);
+ root.addAppender(ap);
+
+ isOperationLogCaptureIntialized = true;
+ }
+
+ public OperationLog createNewOperationLog(OperationHandle operationHandle, String name) {
+ int size = HiveConf.getIntVar(hiveConf, HiveConf.ConfVars.HIVE_SERVER2_IN_MEM_LOG_SIZE);
+ LOG.info("Operation log size: " + size);
+ OperationLog OperationLog = new OperationLog(name, size);
+ OperationLogByOperation.put(operationHandle, OperationLog);
+ return OperationLog;
+ }
+
+ public boolean destroyOperationLog(OperationHandle operationHandle) {
+ OperationLog OperationLog = OperationLogByOperation.remove(operationHandle);
+ if (OperationLog == null) {
+ LOG.debug("No OperationLog found for operation: " + operationHandle.hashCode());
+ return false;
+ }
+ return true;
+ }
+
+ public void registerCurrentThread(OperationHandle operationHandle) throws HiveSQLException {
+ String threadName = Thread.currentThread().getName();
+
+ OperationLog OperationLog = getOperationLogByOperation(operationHandle, true);
+
+ if (OperationLogByThreadName.containsKey(threadName)) {
+ LOG.debug("Thread: " + threadName + " is already registered.");
+ }
+ OperationLogByThreadName.put(threadName, OperationLog);
+ }
+
+ public void registerCurrentThread(OperationLog OperationLog) {
+ String threadName = Thread.currentThread().getName();
+ OperationLogByThreadName.put(threadName, OperationLog);
+ }
+
+ public boolean unregisterCurrentThread() {
+ String threadName = Thread.currentThread().getName();
+ OperationLog OperationLog = OperationLogByThreadName.remove(threadName);
+ if (OperationLog == null) {
+ LOG.debug("Failed to unregister thread " + threadName + ": OperationLog object is currently "
+ + "not regsitered");
+ return false;
+ }
+ return true;
+ }
+
+ public OperationLog getOperationLogByThreadName(String threadName) {
+ OperationLog OperationLog = OperationLogByThreadName.get(threadName);
+ if (OperationLog == null) {
+ LOG.debug("Operation log assocaited with thread: " + threadName + " couldn't be found.");
+ }
+ return OperationLog;
+ }
+
+ public OperationLog getOperationLogByOperation(OperationHandle operationHandle,
+ boolean createIfAbsent) throws HiveSQLException {
+ OperationLog operationLog = OperationLogByOperation.get(operationHandle);
+ if (operationLog == null && createIfAbsent) {
+ operationLog = createNewOperationLog(operationHandle, operationHandle.toString());
+ } else if (operationLog == null) {
+ throw new HiveSQLException("Couldn't find log associated with operation handle: " +
+ operationHandle.toString());
+ }
+ return operationLog;
+ }
+
+ @Override
+ public synchronized void init(HiveConf hiveConf) {
+ this.hiveConf = hiveConf;
+ super.init(hiveConf);
+ if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_SERVER2_IN_MEM_LOGGING)) {
+ initOperationLogCapture();
+ } else {
+ LOG.info("Opeation level logging is turned off");
+ }
+ }
+
+ @Override
+ public synchronized void start() {
+ super.start();
+ }
+
+ @Override
+ public synchronized void stop() {
+ super.stop();
+ }
+}
diff --git service/src/java/org/apache/hive/service/cli/log/OperationLog.java service/src/java/org/apache/hive/service/cli/log/OperationLog.java
new file mode 100644
index 0000000..d2351a7
--- /dev/null
+++ service/src/java/org/apache/hive/service/cli/log/OperationLog.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.log;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Charsets;
+
+public class OperationLog {
+
+ private static final String HIVE_ENCODING = Charsets.UTF_8.name();
+
+ // This OperationLogger's name is added to an exclusion list in OperationLogDivertAppender
+ private static Logger LOG = Logger.getLogger(OperationLog.class.getName());
+
+ private final String operationLogName;
+ private final LinkedStringBuffer operationLogBuffer;
+ private final long creationTime;
+
+ OperationLog(String name, int size) {
+ this.operationLogName = name;
+ this.operationLogBuffer = new LinkedStringBuffer(size);
+ this.creationTime = System.currentTimeMillis();
+ }
+
+ public void writeOperationLog(String OperationLogMessage) {
+ operationLogBuffer.write(OperationLogMessage);
+ }
+
+ public String readOperationLog() {
+ return operationLogBuffer.read();
+ }
+
+ public void resetOperationLog() {
+ operationLogBuffer.clear();
+ }
+
+ /**
+ * The OperationLogOutputStream helps translate a OperationLog to an OutputStream.
+ */
+ private static class OperationLogOutputStream extends OutputStream {
+ private final LinkedStringBuffer backingStore;
+
+ public OperationLogOutputStream(LinkedStringBuffer operationLogBuffer) {
+ super();
+ backingStore = operationLogBuffer;
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ backingStore.write(new String(b, HIVE_ENCODING));
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ backingStore.write(new String(b, off, len, HIVE_ENCODING));
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ byte[] buf = { (byte) b };
+ this.write(buf);
+ }
+ }
+
+ public OutputStream getOutputStream() {
+ return new OperationLogOutputStream(operationLogBuffer);
+ }
+
+ public String getName() {
+ return operationLogName;
+ }
+}
\ No newline at end of file
diff --git service/src/java/org/apache/hive/service/cli/operation/OperationManager.java service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
index 1f78a1d..2805fee 100644
--- service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
+++ service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
@@ -31,6 +31,7 @@
import org.apache.hive.service.cli.RowSet;
import org.apache.hive.service.cli.TableSchema;
import org.apache.hive.service.cli.session.HiveSession;
+import org.apache.hive.service.cli.session.SessionManager;
/**
* OperationManager.
@@ -41,6 +42,7 @@
private HiveConf hiveConf;
private final Map handleToOperation =
new HashMap();
+ private SessionManager sessionManager;
public OperationManager() {
super("OperationManager");
@@ -65,6 +67,13 @@ public synchronized void stop() {
super.stop();
}
+ public void setSessionManager(SessionManager sessionManager) {
+ this.sessionManager = sessionManager;
+ }
+
+ public SessionManager getSessionManager() {
+ return sessionManager;
+ }
public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession,
String statement, Map confOverlay, boolean runAsync) {
ExecuteStatementOperation executeStatementOperation = ExecuteStatementOperation
@@ -168,5 +177,5 @@ public RowSet getOperationNextRowSet(OperationHandle opHandle,
FetchOrientation orientation, long maxRows)
throws HiveSQLException {
return getOperation(opHandle).getNextRowSet(orientation, maxRows);
- }
+ }
}
diff --git service/src/java/org/apache/hive/service/cli/session/HiveSession.java service/src/java/org/apache/hive/service/cli/session/HiveSession.java
index 00058cc..901ac05 100644
--- service/src/java/org/apache/hive/service/cli/session/HiveSession.java
+++ service/src/java/org/apache/hive/service/cli/session/HiveSession.java
@@ -32,6 +32,7 @@
import org.apache.hive.service.cli.RowSet;
import org.apache.hive.service.cli.SessionHandle;
import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.log.LogManager;
import org.apache.hive.service.cli.operation.OperationManager;
public interface HiveSession {
@@ -179,4 +180,9 @@ public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientatio
public String getUserName();
public void setUserName(String userName);
+
+ public void setLogManager(LogManager logManager);
+
+ public LogManager getLogManager();
+
}
diff --git service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index 11c96b2..f714d3d 100644
--- service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -43,6 +43,7 @@
import org.apache.hive.service.cli.RowSet;
import org.apache.hive.service.cli.SessionHandle;
import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.log.LogManager;
import org.apache.hive.service.cli.operation.ExecuteStatementOperation;
import org.apache.hive.service.cli.operation.GetCatalogsOperation;
import org.apache.hive.service.cli.operation.GetColumnsOperation;
@@ -73,6 +74,7 @@
private SessionManager sessionManager;
private OperationManager operationManager;
+ private LogManager logManager;
private IMetaStoreClient metastoreClient = null;
private final Set opHandleSet = new HashSet();
@@ -133,6 +135,14 @@ public HiveConf getHiveConf() {
return hiveConf;
}
+ public LogManager getLogManager() {
+ return logManager;
+ }
+
+ public void setLogManager(LogManager logManager) {
+ this.logManager = logManager;
+ }
+
public IMetaStoreClient getMetaStoreClient() throws HiveSQLException {
if (metastoreClient == null) {
try {
@@ -163,7 +173,7 @@ public GetInfoValue getInfo(GetInfoType getInfoType)
return new GetInfoValue(128);
case CLI_TXN_CAPABLE:
default:
- throw new HiveSQLException("Unrecognized GetInfoType value: " + getInfoType.toString());
+ throw new HiveSQLException("Unrecognized GetInfoType value: " + getInfoType.toString());
}
} finally {
release();
@@ -187,8 +197,12 @@ private OperationHandle executeStatementInternal(String statement, Map handleToSession = new HashMap();
private OperationManager operationManager = new OperationManager();
+ private LogManager logManager = new LogManager();
private static final Object sessionMapLock = new Object();
private ExecutorService backgroundOperationPool;
@@ -60,6 +62,10 @@ public synchronized void init(HiveConf hiveConf) {
LOG.info("HiveServer2: Async execution pool size" + backgroundPoolSize);
backgroundOperationPool = Executors.newFixedThreadPool(backgroundPoolSize);
addService(operationManager);
+ logManager = new LogManager();
+ logManager.setSessionManager(this);
+
+ addService(logManager);
super.init(hiveConf);
}
@@ -107,6 +113,7 @@ public SessionHandle openSession(String username, String password, Map threadLocalIpAddress = new ThreadLocal() {
@Override
protected synchronized String initialValue() {
diff --git service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
index 2f2866f..4c1edec 100644
--- service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
+++ service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
@@ -58,7 +58,6 @@
public static final Log LOG = LogFactory.getLog(ThriftCLIService.class.getName());
-
protected CLIService cliService;
private static final TStatus OK_STATUS = new TStatus(TStatusCode.SUCCESS_STATUS);
private static final TStatus ERROR_STATUS = new TStatus(TStatusCode.ERROR_STATUS);
@@ -108,6 +107,20 @@ public synchronized void stop() {
super.stop();
}
+ @Override
+ public TGetLogResp GetLog(TGetLogReq req) throws TException {
+ TGetLogResp resp = new TGetLogResp();
+ try {
+ String log = cliService.getLog(new OperationHandle(req.getOperationHandle()));
+ resp.setStatus(OK_STATUS);
+ resp.setLog(log);
+ } catch (Exception e) {
+ e.printStackTrace();
+ resp.setStatus(HiveSQLException.toTStatus(e));
+ }
+ return resp;
+ }
+
@Override
public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException {
@@ -442,4 +455,6 @@ public void run() {
}
}
+
+
}
diff --git service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
index 9bb2a0f..24ad23f 100644
--- service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
+++ service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
@@ -388,4 +388,19 @@ public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException {
// TODO: set the correct default fetch size
return fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 10000);
}
+
+ @Override
+ public String getLog(OperationHandle opHandle) throws HiveSQLException {
+ try {
+ TGetLogReq req = new TGetLogReq();
+ req.setOperationHandle(opHandle.toTOperationHandle());
+ TGetLogResp resp = cliService.GetLog(req);
+ checkStatus(resp.getStatus());
+ return new String(resp.getLog());
+ } catch (HiveSQLException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new HiveSQLException(e);
+ }
+ }
}