diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index e971644..1dcd67d 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -763,6 +763,8 @@ HIVE_SERVER2_TABLE_TYPE_MAPPING("hive.server2.table.type.mapping", "CLASSIC"), HIVE_SERVER2_SESSION_HOOK("hive.server2.session.hook", ""), + HIVE_SERVER2_IN_MEM_LOGGING("hive.server2.in.mem.logging", true), + HIVE_SERVER2_IN_MEM_LOG_SIZE("hive.server2.in.mem.log.size", 128 * 1024), HIVE_CONF_RESTRICTED_LIST("hive.conf.restricted.list", null), // If this is set all move tasks at the end of a multi-insert query will only begin once all diff --git conf/hive-default.xml.template conf/hive-default.xml.template index 1ee756c..92d1065 100644 --- conf/hive-default.xml.template +++ conf/hive-default.xml.template @@ -1780,6 +1780,22 @@ + hive.server2.in.mem.logging + true + + Whether to turn on hiveserver2 in memory logging + + + + + hive.server2.in.mem.log.size + 131072 + + Maximum size of the hiveserver2 in memory query log. Note that the size is per query. + + + + hive.decode.partition.name false Whether to show the unquoted partition names in query results. diff --git jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index 2912ece..6b9cba3 100644 --- jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -32,6 +32,8 @@ import org.apache.hive.service.cli.thrift.TCloseOperationResp; import org.apache.hive.service.cli.thrift.TExecuteStatementReq; import org.apache.hive.service.cli.thrift.TExecuteStatementResp; +import org.apache.hive.service.cli.thrift.TGetLogReq; +import org.apache.hive.service.cli.thrift.TGetLogResp; import org.apache.hive.service.cli.thrift.TGetOperationStatusReq; import org.apache.hive.service.cli.thrift.TGetOperationStatusResp; import org.apache.hive.service.cli.thrift.TOperationHandle; @@ -280,6 +282,25 @@ public boolean execute(String sql, String[] columnNames) throws SQLException { throw new SQLException("Method not supported"); } + public String getLog() throws SQLException { + if (isClosed) { + throw new SQLException("Can't get log for statement after statement has been closed"); + } + + TGetLogReq getLogReq = new TGetLogReq(); + TGetLogResp getLogResp; + getLogReq.setOperationHandle(stmtHandle); + try { + getLogResp = client.GetLog(getLogReq); + Utils.verifySuccessWithInfo(getLogResp.getStatus()); + } catch (SQLException e) { + throw e; + } catch (Exception e) { + throw new SQLException(e.toString(), "08S01", e); + } + return getLogResp.getLog(); + } + /* * (non-Javadoc) * diff --git jdbc/src/test/org/apache/hive/jdbc/TestJdbcDriver2.java jdbc/src/test/org/apache/hive/jdbc/TestJdbcDriver2.java index 09ab3c2..be592c5 100644 --- jdbc/src/test/org/apache/hive/jdbc/TestJdbcDriver2.java +++ jdbc/src/test/org/apache/hive/jdbc/TestJdbcDriver2.java @@ -806,6 +806,30 @@ private void doTestErrorCase(String sql, String expectedMessage, exceptionFound); } + public void testGetLog() throws Exception { + HiveStatement stmt = (HiveStatement)con.createStatement(); + assertNotNull("Statement is null", stmt); + + ResultSet res = stmt.executeQuery("select count(*) from " + tableName); + ResultSetMetaData meta = res.getMetaData(); + + boolean moreRow = res.next(); + while (moreRow) { + try { + moreRow = res.next(); + } catch (SQLException e) { + throw e; + } + } + + String log = stmt.getLog(); + assertTrue("Operation Log looks incorrect" , + log.contains("Parsing command: select count(*) from testHiveJdbcDriver_Table")); + assertTrue("Operation Log looks incorrect", + log.contains( "select count(*) from testHiveJdbcDriver_Table")); + + } + public void testShowTables() throws SQLException { Statement stmt = con.createStatement(); assertNotNull("Statement is null", stmt); diff --git service/if/TCLIService.thrift service/if/TCLIService.thrift index 6e20375..db50266 100644 --- service/if/TCLIService.thrift +++ service/if/TCLIService.thrift @@ -208,7 +208,7 @@ struct TUnionTypeEntry { } struct TUserDefinedTypeEntry { - // The fully qualified name of the class implementing this type. + // The fully qualified name of the class implementing this type. 1: required string typeClassName } @@ -995,6 +995,22 @@ struct TFetchResultsResp { 3: optional TRowSet results } +// GetLog() +// +// Fetch operation log from the server corresponding to +// a particular OperationHandle. +struct TGetLogReq { + // Operation whose log is requested + 1: required TOperationHandle operationHandle +} + +struct TGetLogResp { + 1: required TStatus status + + 2: required string log + +} + service TCLIService { TOpenSessionResp OpenSession(1:TOpenSessionReq req); @@ -1028,4 +1044,6 @@ service TCLIService { TGetResultSetMetadataResp GetResultSetMetadata(1:TGetResultSetMetadataReq req); TFetchResultsResp FetchResults(1:TFetchResultsReq req); + + TGetLogResp GetLog(1:TGetLogReq req); } diff --git service/src/java/org/apache/hive/service/cli/CLIService.java service/src/java/org/apache/hive/service/cli/CLIService.java index 1a7f338..16ec330 100644 --- service/src/java/org/apache/hive/service/cli/CLIService.java +++ service/src/java/org/apache/hive/service/cli/CLIService.java @@ -35,6 +35,7 @@ import org.apache.hive.service.CompositeService; import org.apache.hive.service.ServiceException; import org.apache.hive.service.auth.HiveAuthFactory; +import org.apache.hive.service.cli.log.OperationLog; import org.apache.hive.service.cli.session.SessionManager; /** @@ -258,8 +259,10 @@ public OperationHandle getFunctions(SessionHandle sessionHandle, @Override public OperationState getOperationStatus(OperationHandle opHandle) throws HiveSQLException { + startLogCapture(opHandle); OperationState opState = sessionManager.getOperationManager().getOperationState(opHandle); LOG.info(opHandle + ": getOperationStatus()"); + stopLogCapture(); return opState; } @@ -269,9 +272,12 @@ public OperationState getOperationStatus(OperationHandle opHandle) @Override public void cancelOperation(OperationHandle opHandle) throws HiveSQLException { + startLogCapture(opHandle); sessionManager.getOperationManager().getOperation(opHandle). getParentSession().cancelOperation(opHandle); LOG.info(opHandle + ": cancelOperation()"); + sessionManager.getLogManager().destroyOperationLog(opHandle); + stopLogCapture(); } /* (non-Javadoc) @@ -280,9 +286,12 @@ public void cancelOperation(OperationHandle opHandle) @Override public void closeOperation(OperationHandle opHandle) throws HiveSQLException { + startLogCapture(opHandle); sessionManager.getOperationManager().getOperation(opHandle). getParentSession().closeOperation(opHandle); LOG.info(opHandle + ": closeOperation"); + sessionManager.getLogManager().destroyOperationLog(opHandle); + stopLogCapture(); } /* (non-Javadoc) @@ -291,9 +300,11 @@ public void closeOperation(OperationHandle opHandle) @Override public TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException { + startLogCapture(opHandle); TableSchema tableSchema = sessionManager.getOperationManager().getOperation(opHandle). getParentSession().getResultSetMetadata(opHandle); LOG.info(opHandle + ": getResultSetMetadata()"); + stopLogCapture(); return tableSchema; } @@ -303,9 +314,11 @@ public TableSchema getResultSetMetadata(OperationHandle opHandle) @Override public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows) throws HiveSQLException { + startLogCapture(opHandle); RowSet rowSet = sessionManager.getOperationManager().getOperation(opHandle). getParentSession().fetchResults(opHandle, orientation, maxRows); LOG.info(opHandle + ": fetchResults()"); + stopLogCapture(); return rowSet; } @@ -315,9 +328,11 @@ public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientatio @Override public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException { + startLogCapture(opHandle); RowSet rowSet = sessionManager.getOperationManager().getOperation(opHandle). getParentSession().fetchResults(opHandle); LOG.info(opHandle + ": fetchResults()"); + stopLogCapture(); return rowSet; } @@ -341,4 +356,24 @@ public synchronized String getDelegationTokenFromMetaStore(String owner) } } } + + /* (non-Javadoc) + * @see org.apache.hive.service.cli.ICLIService#getLog(org.apache.hive.service.cli.OperationHandle) + */ + @Override + public String getLog(OperationHandle opHandle) + throws HiveSQLException { + OperationLog log = sessionManager.getLogManager().getOperationLogByOperation(opHandle, false); + LOG.info(opHandle + ": getLog()"); + return log.readOperationLog(); + } + + private void startLogCapture(OperationHandle operationHandle) throws HiveSQLException { + sessionManager.getLogManager().unregisterCurrentThread(); + sessionManager.getLogManager().registerCurrentThread(operationHandle); + } + + private void stopLogCapture() { + sessionManager.getLogManager().unregisterCurrentThread(); + } } diff --git service/src/java/org/apache/hive/service/cli/CLIServiceClient.java service/src/java/org/apache/hive/service/cli/CLIServiceClient.java index 14ef54f..8bd23bf 100644 --- service/src/java/org/apache/hive/service/cli/CLIServiceClient.java +++ service/src/java/org/apache/hive/service/cli/CLIServiceClient.java @@ -158,4 +158,7 @@ public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException { return fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 1000); } + @Override + public abstract String getLog(OperationHandle opHandle) throws HiveSQLException; + } diff --git service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java index 9dca874..4a725b1 100644 --- service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java +++ service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java @@ -188,4 +188,9 @@ public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientatio return cliService.fetchResults(opHandle, orientation, maxRows); } + @Override + public String getLog(OperationHandle opHandle) + throws HiveSQLException { + return cliService.getLog(opHandle); + } } diff --git service/src/java/org/apache/hive/service/cli/ICLIService.java service/src/java/org/apache/hive/service/cli/ICLIService.java index f647ce6..a9ab35e 100644 --- service/src/java/org/apache/hive/service/cli/ICLIService.java +++ service/src/java/org/apache/hive/service/cli/ICLIService.java @@ -91,4 +91,6 @@ public abstract RowSet fetchResults(OperationHandle opHandle, FetchOrientation o public abstract RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException; + public abstract String getLog(OperationHandle opHandle) + throws HiveSQLException; } diff --git service/src/java/org/apache/hive/service/cli/log/LinkedStringBuffer.java service/src/java/org/apache/hive/service/cli/log/LinkedStringBuffer.java new file mode 100644 index 0000000..c4a5106 --- /dev/null +++ service/src/java/org/apache/hive/service/cli/log/LinkedStringBuffer.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.cli.log; + +import java.util.LinkedList; + +/** + * A linked string buffer with a capacity limit. + */ +public class LinkedStringBuffer { + + private final LinkedList list; + private final long capacity; + private int size; + + /** + * Create a buffer with the specified capacity on the number of characters. + */ + public LinkedStringBuffer(long capacity) { + this.capacity = capacity; + list = new LinkedList(); + } + + /** + * @return Size (number of characters) in the buffer + */ + public synchronized long size() { + return size; + } + + /** + * Write to the buffer, which will remove previously written strings if + * we don't fit in capacity. + */ + public synchronized void write(String data) { + list.add(data); + size += data.length(); + + // Trim from the front + while (size > capacity) { + String evicted = list.remove(0); + size -= evicted.length(); + } + } + + /** + * @return All the data in the buffer. + */ + public synchronized String read() { + StringBuilder sb = new StringBuilder(size); + for (String s : list) { + sb.append(s); + } + return sb.toString(); + } + + /** + * Remove all stored data. + */ + public synchronized void clear() { + list.clear(); + size = 0; + } +} diff --git service/src/java/org/apache/hive/service/cli/log/LogDivertAppender.java service/src/java/org/apache/hive/service/cli/log/LogDivertAppender.java new file mode 100644 index 0000000..5629881 --- /dev/null +++ service/src/java/org/apache/hive/service/cli/log/LogDivertAppender.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.cli.log; + +import java.io.CharArrayWriter; + +import org.apache.log4j.Layout; +import org.apache.log4j.Logger; +import org.apache.log4j.WriterAppender; +import org.apache.log4j.spi.Filter; +import org.apache.log4j.spi.LoggingEvent; + +/** + * An Appender to divert logs from individual threads to the LogObject they belong to. + */ +public class LogDivertAppender extends WriterAppender { + private static final Logger LOG = Logger.getLogger(LogDivertAppender.class.getName()); + private final LogManager logManager; + + /** + * A log filter that exclude messages coming from the logger with the given name. + * We apply this filter on the Loggers used by the log diversion stuff, so that + * they don't generate more logs for themselves when they process logs. + */ + private class NameExclusionFilter extends Filter { + private String excludeLoggerName = null; + + public NameExclusionFilter(String excludeLoggerName) { + super(); + this.excludeLoggerName = excludeLoggerName; + } + + @Override + public int decide(LoggingEvent ev) { + if (ev.getLoggerName().equals(excludeLoggerName)) { + return Filter.DENY; + } + return Filter.NEUTRAL; + } + } + + /** This is where the log message will go to */ + private final CharArrayWriter writer = new CharArrayWriter(); + + public LogDivertAppender(Layout layout, LogManager logManager) { + super(); + this.setLayout(layout); + this.setWriter(writer); + this.setName("LogDivertAppender"); + this.logManager = logManager; + + // Filter out messages coming from log processing classes, or we'll run an infinite loop. + this.addFilter(new NameExclusionFilter(LOG.getName())); + this.addFilter(new NameExclusionFilter(OperationLog.class.getName())); + this.addFilter(new NameExclusionFilter(LogManager.class.getName())); + } + + /** + * Overrides WriterAppender.subAppend(), which does the real logging. + * No need to worry about concurrency since log4j calls this synchronously. + */ + @Override + protected void subAppend(LoggingEvent event) { + super.subAppend(event); + // That should've gone into our writer. Notify the LogContext. + String logOutput = writer.toString(); + writer.reset(); + + OperationLog log = logManager.getOperationLogByThreadName(event.getThreadName()); + if (log == null) { + LOG.debug(" ---+++=== Dropped log event from thread " + event.getThreadName()); + return; + } + log.writeOperationLog(logOutput); + } +} diff --git service/src/java/org/apache/hive/service/cli/log/LogManager.java service/src/java/org/apache/hive/service/cli/log/LogManager.java new file mode 100644 index 0000000..8ed7541 --- /dev/null +++ service/src/java/org/apache/hive/service/cli/log/LogManager.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.cli.log; + + +import java.util.Enumeration; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hive.service.AbstractService; +import org.apache.hive.service.cli.HiveSQLException; +import org.apache.hive.service.cli.OperationHandle; +import org.apache.hive.service.cli.session.SessionManager; +import org.apache.log4j.Appender; +import org.apache.log4j.ConsoleAppender; +import org.apache.log4j.Layout; +import org.apache.log4j.Logger; +import org.apache.log4j.PatternLayout; + +/** + * LogManager - LogManager is responsible for managing the lifecycle of in memory operation logs for HS2. + * Each log object is maintained as a rolling log whose size can't exceed 1MB. + * LogManager tracks the log objects by operation handle as well as by the thread whose output will + * be redirected to these log objects. + */ +public class LogManager extends AbstractService { + private HiveConf hiveConf; + + private final Map OperationLogByOperation = + new ConcurrentHashMap (); + private final Map OperationLogByThreadName = + new ConcurrentHashMap (); + + private boolean isOperationLogCaptureIntialized = false; + + private static final String DEFAULT_LAYOUT_PATTERN = "%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n"; + + private static Logger LOG = Logger.getLogger(LogManager.class.getName()); + private SessionManager sessionManager; + + public LogManager() { + super("LogManager"); + } + + public void setSessionManager(SessionManager sessionManager) { + this.sessionManager = sessionManager; + } + + public SessionManager getSessionManager() { + return sessionManager; + } + + public void initOperationLogCapture() { + if (isOperationLogCaptureIntialized) { + return; + } + + // There should be a ConsoleAppender. Copy its Layout. + Logger root = Logger.getRootLogger(); + Layout layout = null; + + Enumeration appenders = root.getAllAppenders(); + while (appenders.hasMoreElements()) { + Appender ap = (Appender) appenders.nextElement(); + if (ap.getClass().equals(ConsoleAppender.class)) { + layout = ap.getLayout(); + break; + } + } + + if (layout == null) { + layout = new PatternLayout(DEFAULT_LAYOUT_PATTERN); + LOG.info("Cannot find a Layout from a ConsoleAppender. Using default Layout pattern."); + } + + // Register another Appender (with the same layout) that talks to us. + Appender ap = new LogDivertAppender(layout, this); + root.addAppender(ap); + + isOperationLogCaptureIntialized = true; + } + + public OperationLog createNewOperationLog(OperationHandle operationHandle, String name) { + int size = HiveConf.getIntVar(hiveConf, HiveConf.ConfVars.HIVE_SERVER2_IN_MEM_LOG_SIZE); + LOG.info("Operation log size: " + size); + OperationLog OperationLog = new OperationLog(name, size); + OperationLogByOperation.put(operationHandle, OperationLog); + return OperationLog; + } + + public boolean destroyOperationLog(OperationHandle operationHandle) { + OperationLog OperationLog = OperationLogByOperation.remove(operationHandle); + if (OperationLog == null) { + LOG.debug("No OperationLog found for operation: " + operationHandle.hashCode()); + return false; + } + return true; + } + + public void registerCurrentThread(OperationHandle operationHandle) throws HiveSQLException { + String threadName = Thread.currentThread().getName(); + + OperationLog OperationLog = getOperationLogByOperation(operationHandle, true); + + if (OperationLogByThreadName.containsKey(threadName)) { + LOG.debug("Thread: " + threadName + " is already registered."); + } + OperationLogByThreadName.put(threadName, OperationLog); + } + + public void registerCurrentThread(OperationLog OperationLog) { + String threadName = Thread.currentThread().getName(); + OperationLogByThreadName.put(threadName, OperationLog); + } + + public boolean unregisterCurrentThread() { + String threadName = Thread.currentThread().getName(); + OperationLog OperationLog = OperationLogByThreadName.remove(threadName); + if (OperationLog == null) { + LOG.debug("Failed to unregister thread " + threadName + ": OperationLog object is currently " + + "not regsitered"); + return false; + } + return true; + } + + public OperationLog getOperationLogByThreadName(String threadName) { + OperationLog OperationLog = OperationLogByThreadName.get(threadName); + if (OperationLog == null) { + LOG.debug("Operation log assocaited with thread: " + threadName + " couldn't be found."); + } + return OperationLog; + } + + public OperationLog getOperationLogByOperation(OperationHandle operationHandle, + boolean createIfAbsent) throws HiveSQLException { + OperationLog operationLog = OperationLogByOperation.get(operationHandle); + if (operationLog == null && createIfAbsent) { + operationLog = createNewOperationLog(operationHandle, operationHandle.toString()); + } else if (operationLog == null) { + throw new HiveSQLException("Couldn't find log associated with operation handle: " + + operationHandle.toString()); + } + return operationLog; + } + + @Override + public synchronized void init(HiveConf hiveConf) { + this.hiveConf = hiveConf; + super.init(hiveConf); + if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_SERVER2_IN_MEM_LOGGING)) { + initOperationLogCapture(); + } else { + LOG.info("Opeation level logging is turned off"); + } + } + + @Override + public synchronized void start() { + super.start(); + } + + @Override + public synchronized void stop() { + super.stop(); + } +} diff --git service/src/java/org/apache/hive/service/cli/log/OperationLog.java service/src/java/org/apache/hive/service/cli/log/OperationLog.java new file mode 100644 index 0000000..d2351a7 --- /dev/null +++ service/src/java/org/apache/hive/service/cli/log/OperationLog.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.cli.log; + +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.log4j.Logger; + +import com.google.common.base.Charsets; + +public class OperationLog { + + private static final String HIVE_ENCODING = Charsets.UTF_8.name(); + + // This OperationLogger's name is added to an exclusion list in OperationLogDivertAppender + private static Logger LOG = Logger.getLogger(OperationLog.class.getName()); + + private final String operationLogName; + private final LinkedStringBuffer operationLogBuffer; + private final long creationTime; + + OperationLog(String name, int size) { + this.operationLogName = name; + this.operationLogBuffer = new LinkedStringBuffer(size); + this.creationTime = System.currentTimeMillis(); + } + + public void writeOperationLog(String OperationLogMessage) { + operationLogBuffer.write(OperationLogMessage); + } + + public String readOperationLog() { + return operationLogBuffer.read(); + } + + public void resetOperationLog() { + operationLogBuffer.clear(); + } + + /** + * The OperationLogOutputStream helps translate a OperationLog to an OutputStream. + */ + private static class OperationLogOutputStream extends OutputStream { + private final LinkedStringBuffer backingStore; + + public OperationLogOutputStream(LinkedStringBuffer operationLogBuffer) { + super(); + backingStore = operationLogBuffer; + } + + @Override + public void write(byte[] b) throws IOException { + backingStore.write(new String(b, HIVE_ENCODING)); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + backingStore.write(new String(b, off, len, HIVE_ENCODING)); + } + + @Override + public void write(int b) throws IOException { + byte[] buf = { (byte) b }; + this.write(buf); + } + } + + public OutputStream getOutputStream() { + return new OperationLogOutputStream(operationLogBuffer); + } + + public String getName() { + return operationLogName; + } +} \ No newline at end of file diff --git service/src/java/org/apache/hive/service/cli/operation/OperationManager.java service/src/java/org/apache/hive/service/cli/operation/OperationManager.java index 1f78a1d..2805fee 100644 --- service/src/java/org/apache/hive/service/cli/operation/OperationManager.java +++ service/src/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -31,6 +31,7 @@ import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.TableSchema; import org.apache.hive.service.cli.session.HiveSession; +import org.apache.hive.service.cli.session.SessionManager; /** * OperationManager. @@ -41,6 +42,7 @@ private HiveConf hiveConf; private final Map handleToOperation = new HashMap(); + private SessionManager sessionManager; public OperationManager() { super("OperationManager"); @@ -65,6 +67,13 @@ public synchronized void stop() { super.stop(); } + public void setSessionManager(SessionManager sessionManager) { + this.sessionManager = sessionManager; + } + + public SessionManager getSessionManager() { + return sessionManager; + } public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession, String statement, Map confOverlay, boolean runAsync) { ExecuteStatementOperation executeStatementOperation = ExecuteStatementOperation @@ -168,5 +177,5 @@ public RowSet getOperationNextRowSet(OperationHandle opHandle, FetchOrientation orientation, long maxRows) throws HiveSQLException { return getOperation(opHandle).getNextRowSet(orientation, maxRows); - } + } } diff --git service/src/java/org/apache/hive/service/cli/session/HiveSession.java service/src/java/org/apache/hive/service/cli/session/HiveSession.java index 00058cc..901ac05 100644 --- service/src/java/org/apache/hive/service/cli/session/HiveSession.java +++ service/src/java/org/apache/hive/service/cli/session/HiveSession.java @@ -32,6 +32,7 @@ import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.SessionHandle; import org.apache.hive.service.cli.TableSchema; +import org.apache.hive.service.cli.log.LogManager; import org.apache.hive.service.cli.operation.OperationManager; public interface HiveSession { @@ -179,4 +180,9 @@ public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientatio public String getUserName(); public void setUserName(String userName); + + public void setLogManager(LogManager logManager); + + public LogManager getLogManager(); + } diff --git service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index 11c96b2..f714d3d 100644 --- service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -43,6 +43,7 @@ import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.SessionHandle; import org.apache.hive.service.cli.TableSchema; +import org.apache.hive.service.cli.log.LogManager; import org.apache.hive.service.cli.operation.ExecuteStatementOperation; import org.apache.hive.service.cli.operation.GetCatalogsOperation; import org.apache.hive.service.cli.operation.GetColumnsOperation; @@ -73,6 +74,7 @@ private SessionManager sessionManager; private OperationManager operationManager; + private LogManager logManager; private IMetaStoreClient metastoreClient = null; private final Set opHandleSet = new HashSet(); @@ -133,6 +135,14 @@ public HiveConf getHiveConf() { return hiveConf; } + public LogManager getLogManager() { + return logManager; + } + + public void setLogManager(LogManager logManager) { + this.logManager = logManager; + } + public IMetaStoreClient getMetaStoreClient() throws HiveSQLException { if (metastoreClient == null) { try { @@ -163,7 +173,7 @@ public GetInfoValue getInfo(GetInfoType getInfoType) return new GetInfoValue(128); case CLI_TXN_CAPABLE: default: - throw new HiveSQLException("Unrecognized GetInfoType value: " + getInfoType.toString()); + throw new HiveSQLException("Unrecognized GetInfoType value: " + getInfoType.toString()); } } finally { release(); @@ -187,8 +197,12 @@ private OperationHandle executeStatementInternal(String statement, Map handleToSession = new HashMap(); private OperationManager operationManager = new OperationManager(); + private LogManager logManager = new LogManager(); private static final Object sessionMapLock = new Object(); private ExecutorService backgroundOperationPool; @@ -60,6 +62,10 @@ public synchronized void init(HiveConf hiveConf) { LOG.info("HiveServer2: Async execution pool size" + backgroundPoolSize); backgroundOperationPool = Executors.newFixedThreadPool(backgroundPoolSize); addService(operationManager); + logManager = new LogManager(); + logManager.setSessionManager(this); + + addService(logManager); super.init(hiveConf); } @@ -107,6 +113,7 @@ public SessionHandle openSession(String username, String password, Map threadLocalIpAddress = new ThreadLocal() { @Override protected synchronized String initialValue() { diff --git service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 2f2866f..4c1edec 100644 --- service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -58,7 +58,6 @@ public static final Log LOG = LogFactory.getLog(ThriftCLIService.class.getName()); - protected CLIService cliService; private static final TStatus OK_STATUS = new TStatus(TStatusCode.SUCCESS_STATUS); private static final TStatus ERROR_STATUS = new TStatus(TStatusCode.ERROR_STATUS); @@ -108,6 +107,20 @@ public synchronized void stop() { super.stop(); } + @Override + public TGetLogResp GetLog(TGetLogReq req) throws TException { + TGetLogResp resp = new TGetLogResp(); + try { + String log = cliService.getLog(new OperationHandle(req.getOperationHandle())); + resp.setStatus(OK_STATUS); + resp.setLog(log); + } catch (Exception e) { + e.printStackTrace(); + resp.setStatus(HiveSQLException.toTStatus(e)); + } + return resp; + } + @Override public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException { @@ -442,4 +455,6 @@ public void run() { } } + + } diff --git service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java index 9bb2a0f..24ad23f 100644 --- service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java +++ service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java @@ -388,4 +388,19 @@ public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException { // TODO: set the correct default fetch size return fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 10000); } + + @Override + public String getLog(OperationHandle opHandle) throws HiveSQLException { + try { + TGetLogReq req = new TGetLogReq(); + req.setOperationHandle(opHandle.toTOperationHandle()); + TGetLogResp resp = cliService.GetLog(req); + checkStatus(resp.getStatus()); + return new String(resp.getLog()); + } catch (HiveSQLException e) { + throw e; + } catch (Exception e) { + throw new HiveSQLException(e); + } + } }