Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1620975) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -1404,6 +1404,14 @@ "to construct a list exception handlers to handle exceptions thrown\n" + "by record readers"), + // operation log configuration + HIVE_SERVER2_LOGGING_OPERATION_ENABLED("hive.server2.logging.operation.enabled", true, + "When true, HS2 will save operation logs"), + HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION("hive.server2.logging.operation.log.location", + "${system:java.io.tmpdir}" + File.separator + "${system:user.name}" + File.separator + + "operation_logs", + "Top level directory where operation logs are stored if logging functionality is enabled"), + // logging configuration HIVE_LOG4J_FILE("hive.log4j.file", "", "Hive log4j configuration file.\n" + Index: service/if/TCLIService.thrift =================================================================== --- service/if/TCLIService.thrift (revision 1620975) +++ service/if/TCLIService.thrift (working copy) @@ -1054,6 +1054,9 @@ // Max number of rows that should be returned in // the rowset. 3: required i64 maxRows + + // The type of a fetch results request. 0 represents Query output. 1 represents Log + 4: optional i16 fetchType = 0 } struct TFetchResultsResp { Index: service/src/gen/thrift/gen-cpp/TCLIService_types.cpp =================================================================== --- service/src/gen/thrift/gen-cpp/TCLIService_types.cpp (revision 1620975) +++ service/src/gen/thrift/gen-cpp/TCLIService_types.cpp (working copy) @@ -6137,8 +6137,8 @@ swap(a.__isset, b.__isset); } -const char* TFetchResultsReq::ascii_fingerprint = "1B96A8C05BA9DD699FC8CD842240ABDE"; -const uint8_t TFetchResultsReq::binary_fingerprint[16] = {0x1B,0x96,0xA8,0xC0,0x5B,0xA9,0xDD,0x69,0x9F,0xC8,0xCD,0x84,0x22,0x40,0xAB,0xDE}; +const char* TFetchResultsReq::ascii_fingerprint = "B4CB1E4F8F8F4D50183DD372AD11753A"; +const uint8_t TFetchResultsReq::binary_fingerprint[16] = {0xB4,0xCB,0x1E,0x4F,0x8F,0x8F,0x4D,0x50,0x18,0x3D,0xD3,0x72,0xAD,0x11,0x75,0x3A}; uint32_t TFetchResultsReq::read(::apache::thrift::protocol::TProtocol* iprot) { @@ -6189,6 +6189,14 @@ xfer += iprot->skip(ftype); } break; + case 4: + if (ftype == ::apache::thrift::protocol::T_I16) { + xfer += iprot->readI16(this->fetchType); + this->__isset.fetchType = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -6223,6 +6231,11 @@ xfer += oprot->writeI64(this->maxRows); xfer += oprot->writeFieldEnd(); + if (this->__isset.fetchType) { + xfer += oprot->writeFieldBegin("fetchType", ::apache::thrift::protocol::T_I16, 4); + xfer += oprot->writeI16(this->fetchType); + xfer += oprot->writeFieldEnd(); + } xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -6233,6 +6246,8 @@ swap(a.operationHandle, b.operationHandle); swap(a.orientation, b.orientation); swap(a.maxRows, b.maxRows); + swap(a.fetchType, b.fetchType); + swap(a.__isset, b.__isset); } const char* TFetchResultsResp::ascii_fingerprint = "FC43BC2D6F3B76D4DB0F34226A745C8E"; Index: service/src/gen/thrift/gen-cpp/TCLIService_types.h =================================================================== --- service/src/gen/thrift/gen-cpp/TCLIService_types.h (revision 1620975) +++ service/src/gen/thrift/gen-cpp/TCLIService_types.h (working copy) @@ -3602,14 +3602,18 @@ void swap(TGetResultSetMetadataResp &a, TGetResultSetMetadataResp &b); +typedef struct _TFetchResultsReq__isset { + _TFetchResultsReq__isset() : fetchType(true) {} + bool fetchType; +} _TFetchResultsReq__isset; class TFetchResultsReq { public: - static const char* ascii_fingerprint; // = "1B96A8C05BA9DD699FC8CD842240ABDE"; - static const uint8_t binary_fingerprint[16]; // = {0x1B,0x96,0xA8,0xC0,0x5B,0xA9,0xDD,0x69,0x9F,0xC8,0xCD,0x84,0x22,0x40,0xAB,0xDE}; + static const char* ascii_fingerprint; // = "B4CB1E4F8F8F4D50183DD372AD11753A"; + static const uint8_t binary_fingerprint[16]; // = {0xB4,0xCB,0x1E,0x4F,0x8F,0x8F,0x4D,0x50,0x18,0x3D,0xD3,0x72,0xAD,0x11,0x75,0x3A}; - TFetchResultsReq() : orientation((TFetchOrientation::type)0), maxRows(0) { + TFetchResultsReq() : orientation((TFetchOrientation::type)0), maxRows(0), fetchType(0) { orientation = (TFetchOrientation::type)0; } @@ -3619,7 +3623,10 @@ TOperationHandle operationHandle; TFetchOrientation::type orientation; int64_t maxRows; + int16_t fetchType; + _TFetchResultsReq__isset __isset; + void __set_operationHandle(const TOperationHandle& val) { operationHandle = val; } @@ -3632,6 +3639,11 @@ maxRows = val; } + void __set_fetchType(const int16_t val) { + fetchType = val; + __isset.fetchType = true; + } + bool operator == (const TFetchResultsReq & rhs) const { if (!(operationHandle == rhs.operationHandle)) @@ -3640,6 +3652,10 @@ return false; if (!(maxRows == rhs.maxRows)) return false; + if (__isset.fetchType != rhs.__isset.fetchType) + return false; + else if (__isset.fetchType && !(fetchType == rhs.fetchType)) + return false; return true; } bool operator != (const TFetchResultsReq &rhs) const { Index: service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TFetchResultsReq.java =================================================================== --- service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TFetchResultsReq.java (revision 1620975) +++ service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TFetchResultsReq.java (working copy) @@ -37,6 +37,7 @@ private static final org.apache.thrift.protocol.TField OPERATION_HANDLE_FIELD_DESC = new org.apache.thrift.protocol.TField("operationHandle", org.apache.thrift.protocol.TType.STRUCT, (short)1); private static final org.apache.thrift.protocol.TField ORIENTATION_FIELD_DESC = new org.apache.thrift.protocol.TField("orientation", org.apache.thrift.protocol.TType.I32, (short)2); private static final org.apache.thrift.protocol.TField MAX_ROWS_FIELD_DESC = new org.apache.thrift.protocol.TField("maxRows", org.apache.thrift.protocol.TType.I64, (short)3); + private static final org.apache.thrift.protocol.TField FETCH_TYPE_FIELD_DESC = new org.apache.thrift.protocol.TField("fetchType", org.apache.thrift.protocol.TType.I16, (short)4); private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); static { @@ -47,6 +48,7 @@ private TOperationHandle operationHandle; // required private TFetchOrientation orientation; // required private long maxRows; // required + private short fetchType; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -56,7 +58,8 @@ * @see TFetchOrientation */ ORIENTATION((short)2, "orientation"), - MAX_ROWS((short)3, "maxRows"); + MAX_ROWS((short)3, "maxRows"), + FETCH_TYPE((short)4, "fetchType"); private static final Map byName = new HashMap(); @@ -77,6 +80,8 @@ return ORIENTATION; case 3: // MAX_ROWS return MAX_ROWS; + case 4: // FETCH_TYPE + return FETCH_TYPE; default: return null; } @@ -118,7 +123,9 @@ // isset id assignments private static final int __MAXROWS_ISSET_ID = 0; + private static final int __FETCHTYPE_ISSET_ID = 1; private byte __isset_bitfield = 0; + private _Fields optionals[] = {_Fields.FETCH_TYPE}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -128,6 +135,8 @@ new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, TFetchOrientation.class))); tmpMap.put(_Fields.MAX_ROWS, new org.apache.thrift.meta_data.FieldMetaData("maxRows", org.apache.thrift.TFieldRequirementType.REQUIRED, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); + tmpMap.put(_Fields.FETCH_TYPE, new org.apache.thrift.meta_data.FieldMetaData("fetchType", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I16))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TFetchResultsReq.class, metaDataMap); } @@ -135,6 +144,8 @@ public TFetchResultsReq() { this.orientation = org.apache.hive.service.cli.thrift.TFetchOrientation.FETCH_NEXT; + this.fetchType = (short)0; + } public TFetchResultsReq( @@ -161,6 +172,7 @@ this.orientation = other.orientation; } this.maxRows = other.maxRows; + this.fetchType = other.fetchType; } public TFetchResultsReq deepCopy() { @@ -174,6 +186,8 @@ setMaxRowsIsSet(false); this.maxRows = 0; + this.fetchType = (short)0; + } public TOperationHandle getOperationHandle() { @@ -252,6 +266,28 @@ __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __MAXROWS_ISSET_ID, value); } + public short getFetchType() { + return this.fetchType; + } + + public void setFetchType(short fetchType) { + this.fetchType = fetchType; + setFetchTypeIsSet(true); + } + + public void unsetFetchType() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __FETCHTYPE_ISSET_ID); + } + + /** Returns true if field fetchType is set (has been assigned a value) and false otherwise */ + public boolean isSetFetchType() { + return EncodingUtils.testBit(__isset_bitfield, __FETCHTYPE_ISSET_ID); + } + + public void setFetchTypeIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __FETCHTYPE_ISSET_ID, value); + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case OPERATION_HANDLE: @@ -278,6 +314,14 @@ } break; + case FETCH_TYPE: + if (value == null) { + unsetFetchType(); + } else { + setFetchType((Short)value); + } + break; + } } @@ -292,6 +336,9 @@ case MAX_ROWS: return Long.valueOf(getMaxRows()); + case FETCH_TYPE: + return Short.valueOf(getFetchType()); + } throw new IllegalStateException(); } @@ -309,6 +356,8 @@ return isSetOrientation(); case MAX_ROWS: return isSetMaxRows(); + case FETCH_TYPE: + return isSetFetchType(); } throw new IllegalStateException(); } @@ -353,6 +402,15 @@ return false; } + boolean this_present_fetchType = true && this.isSetFetchType(); + boolean that_present_fetchType = true && that.isSetFetchType(); + if (this_present_fetchType || that_present_fetchType) { + if (!(this_present_fetchType && that_present_fetchType)) + return false; + if (this.fetchType != that.fetchType) + return false; + } + return true; } @@ -375,6 +433,11 @@ if (present_maxRows) builder.append(maxRows); + boolean present_fetchType = true && (isSetFetchType()); + builder.append(present_fetchType); + if (present_fetchType) + builder.append(fetchType); + return builder.toHashCode(); } @@ -416,6 +479,16 @@ return lastComparison; } } + lastComparison = Boolean.valueOf(isSetFetchType()).compareTo(typedOther.isSetFetchType()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetFetchType()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.fetchType, typedOther.fetchType); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -455,6 +528,12 @@ sb.append("maxRows:"); sb.append(this.maxRows); first = false; + if (isSetFetchType()) { + if (!first) sb.append(", "); + sb.append("fetchType:"); + sb.append(this.fetchType); + first = false; + } sb.append(")"); return sb.toString(); } @@ -540,6 +619,14 @@ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 4: // FETCH_TYPE + if (schemeField.type == org.apache.thrift.protocol.TType.I16) { + struct.fetchType = iprot.readI16(); + struct.setFetchTypeIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -566,6 +653,11 @@ oprot.writeFieldBegin(MAX_ROWS_FIELD_DESC); oprot.writeI64(struct.maxRows); oprot.writeFieldEnd(); + if (struct.isSetFetchType()) { + oprot.writeFieldBegin(FETCH_TYPE_FIELD_DESC); + oprot.writeI16(struct.fetchType); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -586,6 +678,14 @@ struct.operationHandle.write(oprot); oprot.writeI32(struct.orientation.getValue()); oprot.writeI64(struct.maxRows); + BitSet optionals = new BitSet(); + if (struct.isSetFetchType()) { + optionals.set(0); + } + oprot.writeBitSet(optionals, 1); + if (struct.isSetFetchType()) { + oprot.writeI16(struct.fetchType); + } } @Override @@ -598,6 +698,11 @@ struct.setOrientationIsSet(true); struct.maxRows = iprot.readI64(); struct.setMaxRowsIsSet(true); + BitSet incoming = iprot.readBitSet(1); + if (incoming.get(0)) { + struct.fetchType = iprot.readI16(); + struct.setFetchTypeIsSet(true); + } } } Index: service/src/gen/thrift/gen-py/TCLIService/ttypes.py =================================================================== --- service/src/gen/thrift/gen-py/TCLIService/ttypes.py (revision 1620975) +++ service/src/gen/thrift/gen-py/TCLIService/ttypes.py (working copy) @@ -5752,6 +5752,7 @@ - operationHandle - orientation - maxRows + - fetchType """ thrift_spec = ( @@ -5759,12 +5760,14 @@ (1, TType.STRUCT, 'operationHandle', (TOperationHandle, TOperationHandle.thrift_spec), None, ), # 1 (2, TType.I32, 'orientation', None, 0, ), # 2 (3, TType.I64, 'maxRows', None, None, ), # 3 + (4, TType.I16, 'fetchType', None, 0, ), # 4 ) - def __init__(self, operationHandle=None, orientation=thrift_spec[2][4], maxRows=None,): + def __init__(self, operationHandle=None, orientation=thrift_spec[2][4], maxRows=None, fetchType=thrift_spec[4][4],): self.operationHandle = operationHandle self.orientation = orientation self.maxRows = maxRows + self.fetchType = fetchType def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -5791,6 +5794,11 @@ self.maxRows = iprot.readI64(); else: iprot.skip(ftype) + elif fid == 4: + if ftype == TType.I16: + self.fetchType = iprot.readI16(); + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -5813,6 +5821,10 @@ oprot.writeFieldBegin('maxRows', TType.I64, 3) oprot.writeI64(self.maxRows) oprot.writeFieldEnd() + if self.fetchType is not None: + oprot.writeFieldBegin('fetchType', TType.I16, 4) + oprot.writeI16(self.fetchType) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() Index: service/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb =================================================================== --- service/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb (revision 1620975) +++ service/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb (working copy) @@ -1598,11 +1598,13 @@ OPERATIONHANDLE = 1 ORIENTATION = 2 MAXROWS = 3 + FETCHTYPE = 4 FIELDS = { OPERATIONHANDLE => {:type => ::Thrift::Types::STRUCT, :name => 'operationHandle', :class => ::TOperationHandle}, ORIENTATION => {:type => ::Thrift::Types::I32, :name => 'orientation', :default => 0, :enum_class => ::TFetchOrientation}, - MAXROWS => {:type => ::Thrift::Types::I64, :name => 'maxRows'} + MAXROWS => {:type => ::Thrift::Types::I64, :name => 'maxRows'}, + FETCHTYPE => {:type => ::Thrift::Types::I16, :name => 'fetchType', :default => 0, :optional => true} } def struct_fields; FIELDS; end Index: service/src/java/org/apache/hive/service/cli/CLIService.java =================================================================== --- service/src/java/org/apache/hive/service/cli/CLIService.java (revision 1620975) +++ service/src/java/org/apache/hive/service/cli/CLIService.java (working copy) @@ -420,25 +420,20 @@ } /* (non-Javadoc) - * @see org.apache.hive.service.cli.ICLIService#fetchResults(org.apache.hive.service.cli.OperationHandle, org.apache.hive.service.cli.FetchOrientation, long) + * @see org.apache.hive.service.cli.ICLIService#fetchResults(org.apache.hive.service.cli.OperationHandle) */ @Override - public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows) + public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException { - RowSet rowSet = sessionManager.getOperationManager().getOperation(opHandle) - .getParentSession().fetchResults(opHandle, orientation, maxRows); - LOG.debug(opHandle + ": fetchResults()"); - return rowSet; + return fetchResults(opHandle, Operation.DEFAULT_FETCH_ORIENTATION, + Operation.DEFAULT_FETCH_MAX_ROWS, FetchType.QUERY_OUTPUT); } - /* (non-Javadoc) - * @see org.apache.hive.service.cli.ICLIService#fetchResults(org.apache.hive.service.cli.OperationHandle) - */ @Override - public RowSet fetchResults(OperationHandle opHandle) - throws HiveSQLException { + public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, + long maxRows, FetchType fetchType) throws HiveSQLException { RowSet rowSet = sessionManager.getOperationManager().getOperation(opHandle) - .getParentSession().fetchResults(opHandle); + .getParentSession().fetchResults(opHandle, orientation, maxRows, fetchType); LOG.debug(opHandle + ": fetchResults()"); return rowSet; } Index: service/src/java/org/apache/hive/service/cli/CLIServiceClient.java =================================================================== --- service/src/java/org/apache/hive/service/cli/CLIServiceClient.java (revision 1620975) +++ service/src/java/org/apache/hive/service/cli/CLIServiceClient.java (working copy) @@ -28,19 +28,17 @@ * */ public abstract class CLIServiceClient implements ICLIService { + private static final long DEFAULT_MAX_ROWS = 1000; public SessionHandle openSession(String username, String password) throws HiveSQLException { return openSession(username, password, Collections.emptyMap()); } - /* (non-Javadoc) - * @see org.apache.hive.service.cli.ICLIService#fetchResults(org.apache.hive.service.cli.OperationHandle) - */ @Override public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException { // TODO: provide STATIC default value - return fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 1000); + return fetchResults(opHandle, FetchOrientation.FETCH_NEXT, DEFAULT_MAX_ROWS, FetchType.QUERY_OUTPUT); } @Override Index: service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java =================================================================== --- service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java (revision 1620975) +++ service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java (working copy) @@ -181,13 +181,10 @@ return cliService.getResultSetMetadata(opHandle); } - /* (non-Javadoc) - * @see org.apache.hive.service.cli.CLIServiceClient#fetchResults(org.apache.hive.service.cli.OperationHandle, org.apache.hive.service.cli.FetchOrientation, long) - */ @Override - public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows) - throws HiveSQLException { - return cliService.fetchResults(opHandle, orientation, maxRows); + public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, + long maxRows, FetchType fetchType) throws HiveSQLException { + return cliService.fetchResults(opHandle, orientation, maxRows, fetchType); } Index: service/src/java/org/apache/hive/service/cli/FetchType.java =================================================================== --- service/src/java/org/apache/hive/service/cli/FetchType.java (revision 0) +++ service/src/java/org/apache/hive/service/cli/FetchType.java (working copy) @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.cli; + +/** + * FetchType indicates the type of fetchResults request. + * It maps the TFetchType, which is generated from Thrift interface. + */ +public enum FetchType { + QUERY_OUTPUT((short)0), + LOG((short)1); + + private final short tFetchType; + + FetchType(short tFetchType) { + this.tFetchType = tFetchType; + } + + public static FetchType getFetchType(short tFetchType) { + for (FetchType fetchType : values()) { + if (tFetchType == fetchType.toTFetchType()) { + return fetchType; + } + } + return QUERY_OUTPUT; + } + + public short toTFetchType() { + return tFetchType; + } +} Index: service/src/java/org/apache/hive/service/cli/ICLIService.java =================================================================== --- service/src/java/org/apache/hive/service/cli/ICLIService.java (revision 1620975) +++ service/src/java/org/apache/hive/service/cli/ICLIService.java (working copy) @@ -27,79 +27,78 @@ public interface ICLIService { - public abstract SessionHandle openSession(String username, String password, + SessionHandle openSession(String username, String password, Map configuration) throws HiveSQLException; - public abstract SessionHandle openSessionWithImpersonation(String username, String password, + SessionHandle openSessionWithImpersonation(String username, String password, Map configuration, String delegationToken) throws HiveSQLException; - public abstract void closeSession(SessionHandle sessionHandle) + void closeSession(SessionHandle sessionHandle) throws HiveSQLException; - public abstract GetInfoValue getInfo(SessionHandle sessionHandle, GetInfoType infoType) + GetInfoValue getInfo(SessionHandle sessionHandle, GetInfoType infoType) throws HiveSQLException; - public abstract OperationHandle executeStatement(SessionHandle sessionHandle, String statement, + OperationHandle executeStatement(SessionHandle sessionHandle, String statement, Map confOverlay) throws HiveSQLException; - public abstract OperationHandle executeStatementAsync(SessionHandle sessionHandle, + OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement, Map confOverlay) throws HiveSQLException; - public abstract OperationHandle getTypeInfo(SessionHandle sessionHandle) + OperationHandle getTypeInfo(SessionHandle sessionHandle) throws HiveSQLException; - public abstract OperationHandle getCatalogs(SessionHandle sessionHandle) + OperationHandle getCatalogs(SessionHandle sessionHandle) throws HiveSQLException; - public abstract OperationHandle getSchemas(SessionHandle sessionHandle, + OperationHandle getSchemas(SessionHandle sessionHandle, String catalogName, String schemaName) throws HiveSQLException; - public abstract OperationHandle getTables(SessionHandle sessionHandle, + OperationHandle getTables(SessionHandle sessionHandle, String catalogName, String schemaName, String tableName, List tableTypes) throws HiveSQLException; - public abstract OperationHandle getTableTypes(SessionHandle sessionHandle) + OperationHandle getTableTypes(SessionHandle sessionHandle) throws HiveSQLException; - public abstract OperationHandle getColumns(SessionHandle sessionHandle, + OperationHandle getColumns(SessionHandle sessionHandle, String catalogName, String schemaName, String tableName, String columnName) throws HiveSQLException; - public abstract OperationHandle getFunctions(SessionHandle sessionHandle, + OperationHandle getFunctions(SessionHandle sessionHandle, String catalogName, String schemaName, String functionName) throws HiveSQLException; - public abstract OperationStatus getOperationStatus(OperationHandle opHandle) + OperationStatus getOperationStatus(OperationHandle opHandle) throws HiveSQLException; - public abstract void cancelOperation(OperationHandle opHandle) + void cancelOperation(OperationHandle opHandle) throws HiveSQLException; - public abstract void closeOperation(OperationHandle opHandle) + void closeOperation(OperationHandle opHandle) throws HiveSQLException; - public abstract TableSchema getResultSetMetadata(OperationHandle opHandle) + TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException; - public abstract RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, - long maxRows) - throws HiveSQLException; - - public abstract RowSet fetchResults(OperationHandle opHandle) + RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException; - public abstract String getDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory, + RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, + long maxRows, FetchType fetchType) throws HiveSQLException; + + String getDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory, String owner, String renewer) throws HiveSQLException; - public abstract void cancelDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory, + void cancelDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory, String tokenStr) throws HiveSQLException; - public abstract void renewDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory, + void renewDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory, String tokenStr) throws HiveSQLException; Index: service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java =================================================================== --- service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java (revision 1620975) +++ service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java (working copy) @@ -42,11 +42,8 @@ rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion()); } - /* (non-Javadoc) - * @see org.apache.hive.service.cli.Operation#run() - */ @Override - public void run() throws HiveSQLException { + public void runInternal() throws HiveSQLException { setState(OperationState.RUNNING); setState(OperationState.FINISHED); } Index: service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java =================================================================== --- service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java (revision 1620975) +++ service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java (working copy) @@ -114,11 +114,8 @@ this.rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion()); } - /* (non-Javadoc) - * @see org.apache.hive.service.cli.Operation#run() - */ @Override - public void run() throws HiveSQLException { + public void runInternal() throws HiveSQLException { setState(OperationState.RUNNING); try { IMetaStoreClient metastoreClient = getParentSession().getMetaStoreClient(); Index: service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java =================================================================== --- service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java (revision 1620975) +++ service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java (working copy) @@ -68,11 +68,8 @@ this.rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion()); } - /* (non-Javadoc) - * @see org.apache.hive.service.cli.Operation#run() - */ @Override - public void run() throws HiveSQLException { + public void runInternal() throws HiveSQLException { setState(OperationState.RUNNING); try { if ((null == catalogName || "".equals(catalogName)) Index: service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java =================================================================== --- service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java (revision 1620975) +++ service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java (working copy) @@ -50,11 +50,8 @@ this.rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion()); } - /* (non-Javadoc) - * @see org.apache.hive.service.cli.Operation#run() - */ @Override - public void run() throws HiveSQLException { + public void runInternal() throws HiveSQLException { setState(OperationState.RUNNING); try { IMetaStoreClient metastoreClient = getParentSession().getMetaStoreClient(); Index: service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java =================================================================== --- service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java (revision 1620975) +++ service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java (working copy) @@ -50,11 +50,8 @@ rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion()); } - /* (non-Javadoc) - * @see org.apache.hive.service.cli.Operation#run() - */ @Override - public void run() throws HiveSQLException { + public void runInternal() throws HiveSQLException { setState(OperationState.RUNNING); try { for (TableType type : TableType.values()) { Index: service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java =================================================================== --- service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java (revision 1620975) +++ service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java (working copy) @@ -71,11 +71,8 @@ this.rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion()); } - /* (non-Javadoc) - * @see org.apache.hive.service.cli.Operation#run() - */ @Override - public void run() throws HiveSQLException { + public void runInternal() throws HiveSQLException { setState(OperationState.RUNNING); try { IMetaStoreClient metastoreClient = getParentSession().getMetaStoreClient(); Index: service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java =================================================================== --- service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java (revision 1620975) +++ service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java (working copy) @@ -79,11 +79,8 @@ rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion()); } - /* (non-Javadoc) - * @see org.apache.hive.service.cli.Operation#run() - */ @Override - public void run() throws HiveSQLException { + public void runInternal() throws HiveSQLException { setState(OperationState.RUNNING); try { for (Type type : Type.values()) { Index: service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java =================================================================== --- service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java (revision 1620975) +++ service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java (working copy) @@ -94,11 +94,8 @@ IOUtils.cleanup(LOG, parentSession.getSessionState().err); } - /* (non-Javadoc) - * @see org.apache.hive.service.cli.operation.Operation#run() - */ @Override - public void run() throws HiveSQLException { + public void runInternal() throws HiveSQLException { setState(OperationState.RUNNING); try { String command = getStatement().trim(); @@ -136,6 +133,7 @@ setState(OperationState.CLOSED); tearDownSessionIO(); cleanTmpFile(); + cleanupOperationLog(); } /* (non-Javadoc) Index: service/src/java/org/apache/hive/service/cli/operation/LogDivertAppender.java =================================================================== --- service/src/java/org/apache/hive/service/cli/operation/LogDivertAppender.java (revision 0) +++ service/src/java/org/apache/hive/service/cli/operation/LogDivertAppender.java (working copy) @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.cli.operation; +import java.io.CharArrayWriter; + +import org.apache.log4j.Layout; +import org.apache.log4j.Logger; +import org.apache.log4j.WriterAppender; +import org.apache.log4j.spi.Filter; +import org.apache.log4j.spi.LoggingEvent; + +/** + * An Appender to divert logs from individual threads to the LogObject they belong to. + */ +public class LogDivertAppender extends WriterAppender { + private static final Logger LOG = Logger.getLogger(LogDivertAppender.class.getName()); + private final OperationManager operationManager; + + /** + * A log filter that exclude messages coming from the logger with the given name. + * We apply this filter on the Loggers used by the log diversion stuff, so that + * they don't generate more logs for themselves when they process logs. + */ + private static class NameExclusionFilter extends Filter { + private String excludeLoggerName = null; + + public NameExclusionFilter(String excludeLoggerName) { + this.excludeLoggerName = excludeLoggerName; + } + + @Override + public int decide(LoggingEvent ev) { + if (ev.getLoggerName().equals(excludeLoggerName)) { + return Filter.DENY; + } + return Filter.NEUTRAL; + } + } + + /** This is where the log message will go to */ + private final CharArrayWriter writer = new CharArrayWriter(); + + public LogDivertAppender(Layout layout, OperationManager operationManager) { + setLayout(layout); + setWriter(writer); + setName("LogDivertAppender"); + this.operationManager = operationManager; + + // Filter out messages coming from log processing classes, or we'll run an infinite loop. + addFilter(new NameExclusionFilter(LOG.getName())); + addFilter(new NameExclusionFilter(OperationLog.class.getName())); + addFilter(new NameExclusionFilter(OperationManager.class.getName())); + } + + /** + * Overrides WriterAppender.subAppend(), which does the real logging. + * No need to worry about concurrency since log4j calls this synchronously. + */ + @Override + protected void subAppend(LoggingEvent event) { + super.subAppend(event); + // That should've gone into our writer. Notify the LogContext. + String logOutput = writer.toString(); + writer.reset(); + + OperationLog log = operationManager.getOperationLogByThread(); + if (log == null) { + LOG.debug(" ---+++=== Dropped log event from thread " + event.getThreadName()); + return; + } + log.writeOperationLog(logOutput); + } +} Index: service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java =================================================================== --- service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java (revision 1620975) +++ service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java (working copy) @@ -46,6 +46,7 @@ @Override public void close() throws HiveSQLException { setState(OperationState.CLOSED); + cleanupOperationLog(); } /** Index: service/src/java/org/apache/hive/service/cli/operation/Operation.java =================================================================== --- service/src/java/org/apache/hive/service/cli/operation/Operation.java (revision 1620975) +++ service/src/java/org/apache/hive/service/cli/operation/Operation.java (working copy) @@ -17,6 +17,8 @@ */ package org.apache.hive.service.cli.operation; +import java.io.File; +import java.io.FileNotFoundException; import java.util.EnumSet; import java.util.concurrent.Future; @@ -41,11 +43,14 @@ private final OperationHandle opHandle; private HiveConf configuration; public static final Log LOG = LogFactory.getLog(Operation.class.getName()); + public static final FetchOrientation DEFAULT_FETCH_ORIENTATION = FetchOrientation.FETCH_NEXT; public static final long DEFAULT_FETCH_MAX_ROWS = 100; protected boolean hasResultSet; protected volatile HiveSQLException operationException; protected final boolean runAsync; protected volatile Future backgroundHandle; + protected OperationLog operationLog; + protected boolean isOperationLogEnabled; protected static final EnumSet DEFAULT_FETCH_ORIENTATION_SET = EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST); @@ -106,6 +111,11 @@ opHandle.setHasResultSet(hasResultSet); } + + public OperationLog getOperationLog() { + return operationLog; + } + protected final OperationState setState(OperationState newState) throws HiveSQLException { state.validateTransition(newState); this.state = newState; @@ -138,8 +148,98 @@ return OperationState.ERROR.equals(state); } - public abstract void run() throws HiveSQLException; + protected void createOperationLog() { + if (parentSession.isOperationLogEnabled()) { + File operationLogFile = new File(parentSession.getOperationLogSessionDir(), + opHandle.getHandleIdentifier().toString()); + isOperationLogEnabled = true; + // create log file + try { + if (operationLogFile.exists()) { + LOG.warn("The operation log file should not exist, but it is already there: " + + operationLogFile.getAbsolutePath()); + operationLogFile.delete(); + } + if (!operationLogFile.createNewFile()) { + // the log file already exists and cannot be deleted. + // If it can be read/written, keep its contents and use it. + if (!operationLogFile.canRead() || !operationLogFile.canWrite()) { + LOG.warn("The already existed operation log file cannot be recreated, " + + "and it cannot be read or written: " + operationLogFile.getAbsolutePath()); + isOperationLogEnabled = false; + return; + } + } + } catch (Exception e) { + LOG.warn("Unable to create operation log file: " + operationLogFile.getAbsolutePath(), e); + isOperationLogEnabled = false; + return; + } + + // create OperationLog object with above log file + try { + operationLog = new OperationLog(opHandle.toString(), operationLogFile); + } catch (FileNotFoundException e) { + LOG.warn("Unable to instantiate OperationLog object for operation: " + + opHandle, e); + isOperationLogEnabled = false; + return; + } + + // register this operationLog to current thread + OperationLog.setCurrentOperationLog(operationLog); + } + } + + protected void unregisterOperationLog() { + if (isOperationLogEnabled) { + OperationLog.removeCurrentOperationLog(); + } + } + + /** + * Invoked before runInternal(). + * Set up some preconditions, or configurations. + */ + protected void beforeRun() { + createOperationLog(); + } + + /** + * Invoked after runInternal(), even if an exception is thrown in runInternal(). + * Clean up resources, which was set up in beforeRun(). + */ + protected void afterRun() { + unregisterOperationLog(); + } + + /** + * Implemented by subclass of Operation class to execute specific behaviors. + * @throws HiveSQLException + */ + protected abstract void runInternal() throws HiveSQLException; + + public void run() throws HiveSQLException { + beforeRun(); + try { + runInternal(); + } finally { + afterRun(); + } + } + + protected void cleanupOperationLog() { + if (isOperationLogEnabled) { + if (operationLog == null) { + LOG.error("Operation [ " + opHandle.getHandleIdentifier() + " ] " + + "logging is enabled, but its OperationLog object cannot be found."); + } else { + operationLog.close(); + } + } + } + // TODO: make this abstract and implement in subclasses. public void cancel() throws HiveSQLException { setState(OperationState.CANCELED); Index: service/src/java/org/apache/hive/service/cli/operation/OperationLog.java =================================================================== --- service/src/java/org/apache/hive/service/cli/operation/OperationLog.java (revision 0) +++ service/src/java/org/apache/hive/service/cli/operation/OperationLog.java (working copy) @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.service.cli.operation; + +import com.google.common.base.Charsets; +import org.apache.commons.io.FileUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.IOUtils; +import org.apache.hive.service.cli.FetchOrientation; +import org.apache.hive.service.cli.HiveSQLException; +import org.apache.hive.service.cli.OperationHandle; + +import java.io.*; +import java.util.ArrayList; +import java.util.List; + +/** + * OperationLog wraps the actual operation log file, and provides interface + * for accessing, reading, writing, and removing the file. + */ +public class OperationLog { + private static final Log LOG = LogFactory.getLog(OperationLog.class.getName()); + + private final String operationName; + private final LogFile logFile; + + public OperationLog(String name, File file) throws FileNotFoundException{ + operationName = name; + logFile = new LogFile(file); + } + + /** + * Singleton OperationLog object per thread. + */ + private static final ThreadLocal THREAD_LOCAL_OPERATION_LOG = new + ThreadLocal() { + @Override + protected synchronized OperationLog initialValue() { + return null; + } + }; + + public static void setCurrentOperationLog(OperationLog operationLog) { + THREAD_LOCAL_OPERATION_LOG.set(operationLog); + } + + public static OperationLog getCurrentOperationLog() { + return THREAD_LOCAL_OPERATION_LOG.get(); + } + + public static void removeCurrentOperationLog() { + THREAD_LOCAL_OPERATION_LOG.remove(); + } + + /** + * Write operation execution logs into log file + * @param operationLogMessage one line of log emitted from log4j + */ + public void writeOperationLog(String operationLogMessage) { + logFile.write(operationLogMessage); + } + + /** + * Read operation execution logs from log file + * @param fetchOrientation one of Enum FetchOrientation values + * @param maxRows the max number of fetched lines from log + * @return + * @throws HiveSQLException + */ + public List readOperationLog(FetchOrientation fetchOrientation, long maxRows) + throws HiveSQLException{ + return logFile.read(fetchOrientation, maxRows); + } + + /** + * Close this OperationLog when operation is closed. The log file will be removed. + */ + public void close() { + logFile.remove(); + } + + /** + * Wrapper for read/write the operation log file + */ + private class LogFile { + private File file; + private BufferedReader in; + private PrintStream out; + private volatile boolean isRemoved; + + LogFile(File file) throws FileNotFoundException { + this.file = file; + in = new BufferedReader(new InputStreamReader(new FileInputStream(file))); + out = new PrintStream(new FileOutputStream(file)); + isRemoved = false; + } + + synchronized void write(String msg) { + // write log to the file + out.print(msg); + } + + synchronized List read(FetchOrientation fetchOrientation, long maxRows) + throws HiveSQLException{ + // reset the BufferReader, if fetching from the beginning of the file + if (fetchOrientation.equals(FetchOrientation.FETCH_FIRST)) { + resetIn(); + } + + return readResults(maxRows); + } + + void remove() { + try { + FileUtils.forceDelete(file); + isRemoved = true; + } catch (Exception e) { + LOG.error("Failed to remove corresponding log file of operation: " + operationName, e); + } + } + + private void resetIn() { + if (in != null) { + IOUtils.cleanup(LOG, in); + in = null; + } + } + + private List readResults(long nLines) throws HiveSQLException { + if (in == null) { + try { + in = new BufferedReader(new InputStreamReader(new FileInputStream(file))); + } catch (FileNotFoundException e) { + if (isRemoved) { + throw new HiveSQLException("The operation has been closed and its log file " + + file.getAbsolutePath() + " has been removed.", e); + } else { + throw new HiveSQLException("Operation Log file " + file.getAbsolutePath() + + " is not found.", e); + } + } + } + + List logs = new ArrayList(); + String line = ""; + // if nLines <= 0, read all lines in log file. + for (int i = 0; i < nLines || nLines <= 0; i++) { + try { + line = in.readLine(); + if (line == null) { + break; + } else { + logs.add(line); + } + } catch (IOException e) { + if (isRemoved) { + throw new HiveSQLException("The operation has been closed and its log file " + + file.getAbsolutePath() + " has been removed.", e); + } else { + throw new HiveSQLException("Reading operation log file encountered an exception: ", e); + } + } + } + return logs; + } + } +} Index: service/src/java/org/apache/hive/service/cli/operation/OperationManager.java =================================================================== --- service/src/java/org/apache/hive/service/cli/operation/OperationManager.java (revision 1620975) +++ service/src/java/org/apache/hive/service/cli/operation/OperationManager.java (working copy) @@ -18,6 +18,7 @@ package org.apache.hive.service.cli.operation; +import java.util.Enumeration; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -25,22 +26,19 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hive.service.AbstractService; -import org.apache.hive.service.cli.FetchOrientation; -import org.apache.hive.service.cli.HiveSQLException; -import org.apache.hive.service.cli.OperationHandle; -import org.apache.hive.service.cli.OperationState; -import org.apache.hive.service.cli.OperationStatus; -import org.apache.hive.service.cli.RowSet; -import org.apache.hive.service.cli.TableSchema; +import org.apache.hive.service.cli.*; import org.apache.hive.service.cli.session.HiveSession; +import org.apache.log4j.*; /** * OperationManager. * */ public class OperationManager extends AbstractService { - + private static final String DEFAULT_LAYOUT_PATTERN = "%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n"; private final Log LOG = LogFactory.getLog(OperationManager.class.getName()); private HiveConf hiveConf; @@ -54,7 +52,11 @@ @Override public synchronized void init(HiveConf hiveConf) { this.hiveConf = hiveConf; - + if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) { + initOperationLogCapture(); + } else { + LOG.debug("Operation level logging is turned off"); + } super.init(hiveConf); } @@ -70,6 +72,30 @@ super.stop(); } + private void initOperationLogCapture() { + // There should be a ConsoleAppender. Copy its Layout. + Logger root = Logger.getRootLogger(); + Layout layout = null; + + Enumeration appenders = root.getAllAppenders(); + while (appenders.hasMoreElements()) { + Appender ap = (Appender) appenders.nextElement(); + if (ap.getClass().equals(ConsoleAppender.class)) { + layout = ap.getLayout(); + break; + } + } + + if (layout == null) { + layout = new PatternLayout(DEFAULT_LAYOUT_PATTERN); + LOG.info("Cannot find a Layout from a ConsoleAppender. Using default Layout pattern."); + } + + // Register another Appender (with the same layout) that talks to us. + Appender ap = new LogDivertAppender(layout, this); + root.addAppender(ap); + } + public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession, String statement, Map confOverlay, boolean runAsync) throws HiveSQLException { @@ -191,4 +217,39 @@ throws HiveSQLException { return getOperation(opHandle).getNextRowSet(orientation, maxRows); } + + public RowSet getOperationLogRowSet(OperationHandle opHandle, + FetchOrientation orientation, long maxRows) + throws HiveSQLException { + // get the OperationLog object from the operation + OperationLog operationLog = getOperation(opHandle).getOperationLog(); + if (operationLog == null) { + throw new HiveSQLException("Couldn't find log associated with operation handle: " + opHandle); + } + + // read logs + List logs = operationLog.readOperationLog(orientation, maxRows); + + // convert logs to RowSet + TableSchema tableSchema = new TableSchema(getLogSchema()); + RowSet rowSet = RowSetFactory.create(tableSchema, getOperation(opHandle).getProtocolVersion()); + for (String log : logs) { + rowSet.addRow(new String[] {log}); + } + + return rowSet; + } + + private Schema getLogSchema() { + Schema schema = new Schema(); + FieldSchema fieldSchema = new FieldSchema(); + fieldSchema.setName("operation_log"); + fieldSchema.setType("string"); + schema.addToFieldSchemas(fieldSchema); + return schema; + } + + public OperationLog getOperationLogByThread() { + return OperationLog.getCurrentOperationLog(); + } } Index: service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java =================================================================== --- service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java (revision 1620975) +++ service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java (working copy) @@ -135,7 +135,7 @@ } } - private void runInternal(HiveConf sqlOperationConf) throws HiveSQLException { + private void runQuery(HiveConf sqlOperationConf) throws HiveSQLException { try { // In Hive server mode, we are not able to retry in the FetchTask // case, when calling fetch queries since execute() has returned. @@ -165,12 +165,12 @@ } @Override - public void run() throws HiveSQLException { + public void runInternal() throws HiveSQLException { setState(OperationState.PENDING); final HiveConf opConfig = getConfigForOperation(); prepare(opConfig); if (!shouldRunAsync()) { - runInternal(opConfig); + runQuery(opConfig); } else { // We'll pass ThreadLocals in the background thread from the foreground (handler) thread final SessionState parentSessionState = SessionState.get(); @@ -190,11 +190,15 @@ public Object run() throws HiveSQLException { Hive.set(parentHive); SessionState.setCurrentSessionState(parentSessionState); + // Set current OperationLog in this async thread for keeping on saving query log. + registerCurrentOperationLog(); try { - runInternal(opConfig); + runQuery(opConfig); } catch (HiveSQLException e) { setOperationException(e); LOG.error("Error running hive query: ", e); + } finally { + unregisterOperationLog(); } return null; } @@ -260,6 +264,18 @@ } } + private void registerCurrentOperationLog() { + if (isOperationLogEnabled) { + if (operationLog == null) { + LOG.warn("Failed to get current OperationLog object of Operation: " + + getHandle().getHandleIdentifier()); + isOperationLogEnabled = false; + return; + } + OperationLog.setCurrentOperationLog(operationLog); + } + } + private void cleanup(OperationState state) throws HiveSQLException { setState(state); if (shouldRunAsync()) { @@ -288,6 +304,7 @@ @Override public void close() throws HiveSQLException { cleanup(OperationState.CLOSED); + cleanupOperationLog(); } @Override Index: service/src/java/org/apache/hive/service/cli/session/HiveSession.java =================================================================== --- service/src/java/org/apache/hive/service/cli/session/HiveSession.java (revision 1620975) +++ service/src/java/org/apache/hive/service/cli/session/HiveSession.java (working copy) @@ -23,13 +23,7 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hive.service.auth.HiveAuthFactory; -import org.apache.hive.service.cli.FetchOrientation; -import org.apache.hive.service.cli.GetInfoType; -import org.apache.hive.service.cli.GetInfoValue; -import org.apache.hive.service.cli.HiveSQLException; -import org.apache.hive.service.cli.OperationHandle; -import org.apache.hive.service.cli.RowSet; -import org.apache.hive.service.cli.TableSchema; +import org.apache.hive.service.cli.*; public interface HiveSession extends HiveSessionBase { @@ -144,11 +138,9 @@ public TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException; - public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows) - throws HiveSQLException; + public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, + long maxRows, FetchType fetchType) throws HiveSQLException; - public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException; - public String getDelegationToken(HiveAuthFactory authFactory, String owner, String renewer) throws HiveSQLException; Index: service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java =================================================================== --- service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java (revision 1620975) +++ service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java (working copy) @@ -24,6 +24,7 @@ import org.apache.hive.service.cli.operation.OperationManager; import org.apache.hive.service.cli.thrift.TProtocolVersion; +import java.io.File; import java.util.Map; /** @@ -38,40 +39,57 @@ * Set the session manager for the session * @param sessionManager */ - public void setSessionManager(SessionManager sessionManager); + void setSessionManager(SessionManager sessionManager); /** * Get the session manager for the session */ - public SessionManager getSessionManager(); + SessionManager getSessionManager(); /** * Set operation manager for the session * @param operationManager */ - public void setOperationManager(OperationManager operationManager); + void setOperationManager(OperationManager operationManager); /** * Initialize the session * @param sessionConfMap */ - public void initialize(Map sessionConfMap) throws Exception; + void initialize(Map sessionConfMap) throws Exception; - public SessionHandle getSessionHandle(); + /** + * Check whether operation logging is enabled and session dir is created successfully + */ + boolean isOperationLogEnabled(); - public String getUsername(); + /** + * Get the session dir, which is the parent dir of operation logs + * @return a file representing the parent directory of operation logs + */ + File getOperationLogSessionDir(); - public String getPassword(); + /** + * Set the session dir, which is the parent dir of operation logs + * @param operationLogRootDir the parent dir of the session dir + */ + void setOperationLogSessionDir(File operationLogRootDir); - public HiveConf getHiveConf(); + SessionHandle getSessionHandle(); - public SessionState getSessionState(); + String getUsername(); - public String getUserName(); + String getPassword(); - public void setUserName(String userName); + HiveConf getHiveConf(); - public String getIpAddress(); + SessionState getSessionState(); - public void setIpAddress(String ipAddress); + String getUserName(); + + void setUserName(String userName); + + String getIpAddress(); + + void setIpAddress(String ipAddress); } Index: service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java =================================================================== --- service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java (revision 1620975) +++ service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java (working copy) @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Set; +import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.common.cli.HiveFileProcessor; @@ -44,14 +45,7 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.common.util.HiveVersionInfo; import org.apache.hive.service.auth.HiveAuthFactory; -import org.apache.hive.service.cli.FetchOrientation; -import org.apache.hive.service.cli.GetInfoType; -import org.apache.hive.service.cli.GetInfoValue; -import org.apache.hive.service.cli.HiveSQLException; -import org.apache.hive.service.cli.OperationHandle; -import org.apache.hive.service.cli.RowSet; -import org.apache.hive.service.cli.SessionHandle; -import org.apache.hive.service.cli.TableSchema; +import org.apache.hive.service.cli.*; import org.apache.hive.service.cli.operation.ExecuteStatementOperation; import org.apache.hive.service.cli.operation.GetCatalogsOperation; import org.apache.hive.service.cli.operation.GetColumnsOperation; @@ -87,6 +81,8 @@ private OperationManager operationManager; private IMetaStoreClient metastoreClient = null; private final Set opHandleSet = new HashSet(); + private boolean isOperationLogEnabled; + private File sessionLogDir; public HiveSessionImpl(TProtocolVersion protocol, String username, String password, HiveConf serverhiveConf, String ipAddress) { @@ -187,6 +183,34 @@ } @Override + public void setOperationLogSessionDir(File operationLogRootDir) { + sessionLogDir = new File(operationLogRootDir, sessionHandle.getHandleIdentifier().toString()); + isOperationLogEnabled = true; + + if (!sessionLogDir.exists()) { + if (!sessionLogDir.mkdir()) { + LOG.warn("Unable to create operation log session directory: " + + sessionLogDir.getAbsolutePath()); + isOperationLogEnabled = false; + } + } + + if (isOperationLogEnabled) { + LOG.info("Operation log session directory is created: " + sessionLogDir.getAbsolutePath()); + } + } + + @Override + public boolean isOperationLogEnabled() { + return isOperationLogEnabled; + } + + @Override + public File getOperationLogSessionDir() { + return sessionLogDir; + } + + @Override public TProtocolVersion getProtocolVersion() { return sessionHandle.getProtocolVersion(); } @@ -497,6 +521,9 @@ operationManager.closeOperation(opHandle); } opHandleSet.clear(); + // Cleanup session log directory. + cleanupSessionLogDir(); + HiveHistory hiveHist = sessionState.getHiveHistory(); if (null != hiveHist) { hiveHist.closeStream(); @@ -509,6 +536,16 @@ } } + private void cleanupSessionLogDir() { + if (isOperationLogEnabled) { + try { + FileUtils.forceDelete(sessionLogDir); + } catch (Exception e) { + LOG.error("Failed to cleanup session log dir: " + sessionHandle, e); + } + } + } + @Override public SessionState getSessionState() { return sessionState; @@ -556,27 +593,22 @@ } @Override - public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows) - throws HiveSQLException { + public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, + long maxRows, FetchType fetchType) throws HiveSQLException { acquire(); try { - return sessionManager.getOperationManager() - .getOperationNextRowSet(opHandle, orientation, maxRows); + if (fetchType == FetchType.QUERY_OUTPUT) { + return sessionManager.getOperationManager() + .getOperationNextRowSet(opHandle, orientation, maxRows); + } else { + return sessionManager.getOperationManager() + .getOperationLogRowSet(opHandle, orientation, maxRows); + } } finally { release(); } } - @Override - public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException { - acquire(); - try { - return sessionManager.getOperationManager().getOperationNextRowSet(opHandle); - } finally { - release(); - } - } - protected HiveSession getSession() { return this; } Index: service/src/java/org/apache/hive/service/cli/session/SessionManager.java =================================================================== --- service/src/java/org/apache/hive/service/cli/session/SessionManager.java (revision 1620975) +++ service/src/java/org/apache/hive/service/cli/session/SessionManager.java (working copy) @@ -18,6 +18,8 @@ package org.apache.hive.service.cli.session; +import java.io.File; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -26,6 +28,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -53,6 +56,8 @@ new ConcurrentHashMap(); private final OperationManager operationManager = new OperationManager(); private ThreadPoolExecutor backgroundOperationPool; + private boolean isOperationLogEnabled; + private File operationLogRootDir; public SessionManager() { super("SessionManager"); @@ -66,6 +71,10 @@ throw new RuntimeException("Error applying authorization policy on hive configuration", e); } this.hiveConf = hiveConf; + //Create operation log root directory, if operation logging is enabled + if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) { + initOperationLogRootDir(); + } createBackgroundOperationPool(); addService(operationManager); super.init(hiveConf); @@ -97,6 +106,36 @@ ss.applyAuthorizationPolicy(); } + private void initOperationLogRootDir() { + operationLogRootDir = new File( + hiveConf.getVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION)); + isOperationLogEnabled = true; + + if (operationLogRootDir.exists() && !operationLogRootDir.isDirectory()) { + LOG.warn("The operation log root directory exists, but it is not a directory: " + + operationLogRootDir.getAbsolutePath()); + isOperationLogEnabled = false; + } + + if (!operationLogRootDir.exists()) { + if (!operationLogRootDir.mkdirs()) { + LOG.warn("Unable to create operation log root directory: " + + operationLogRootDir.getAbsolutePath()); + isOperationLogEnabled = false; + } + } + + if (isOperationLogEnabled) { + LOG.info("Operation log root directory is created: " + operationLogRootDir.getAbsolutePath()); + try { + FileUtils.forceDeleteOnExit(operationLogRootDir); + } catch (IOException e) { + LOG.warn("Failed to schedule cleanup HS2 operation logging root dir: " + + operationLogRootDir.getAbsolutePath(), e); + } + } + } + @Override public synchronized void start() { super.start(); @@ -115,8 +154,20 @@ " seconds has been exceeded. RUNNING background operations will be shut down", e); } } + cleanupLoggingRootDir(); } + private void cleanupLoggingRootDir() { + if (isOperationLogEnabled) { + try { + FileUtils.forceDelete(operationLogRootDir); + } catch (Exception e) { + LOG.warn("Failed to cleanup root dir of HS2 logging: " + operationLogRootDir + .getAbsolutePath(), e); + } + } + } + public SessionHandle openSession(TProtocolVersion protocol, String username, String password, String ipAddress, Map sessionConf) throws HiveSQLException { return openSession(protocol, username, password, ipAddress, sessionConf, false, null); @@ -138,6 +189,9 @@ session.setOperationManager(operationManager); try { session.initialize(sessionConf); + if (isOperationLogEnabled) { + session.setOperationLogSessionDir(operationLogRootDir); + } session.open(); } catch (Exception e) { throw new HiveSQLException("Failed to open new session", e); Index: service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java =================================================================== --- service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java (revision 1620975) +++ service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java (working copy) @@ -32,16 +32,7 @@ import org.apache.hive.service.AbstractService; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.auth.TSetIpAddressProcessor; -import org.apache.hive.service.cli.CLIService; -import org.apache.hive.service.cli.FetchOrientation; -import org.apache.hive.service.cli.GetInfoType; -import org.apache.hive.service.cli.GetInfoValue; -import org.apache.hive.service.cli.HiveSQLException; -import org.apache.hive.service.cli.OperationHandle; -import org.apache.hive.service.cli.OperationStatus; -import org.apache.hive.service.cli.RowSet; -import org.apache.hive.service.cli.SessionHandle; -import org.apache.hive.service.cli.TableSchema; +import org.apache.hive.service.cli.*; import org.apache.hive.service.cli.session.SessionManager; import org.apache.thrift.TException; import org.apache.thrift.server.TServer; @@ -534,7 +525,8 @@ RowSet rowSet = cliService.fetchResults( new OperationHandle(req.getOperationHandle()), FetchOrientation.getFetchOrientation(req.getOrientation()), - req.getMaxRows()); + req.getMaxRows(), + FetchType.getFetchType(req.getFetchType())); resp.setResults(rowSet.toTRowSet()); resp.setHasMoreRows(false); resp.setStatus(OK_STATUS); Index: service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java =================================================================== --- service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java (revision 1620975) +++ service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java (working copy) @@ -22,18 +22,7 @@ import java.util.Map; import org.apache.hive.service.auth.HiveAuthFactory; -import org.apache.hive.service.cli.CLIServiceClient; -import org.apache.hive.service.cli.FetchOrientation; -import org.apache.hive.service.cli.GetInfoType; -import org.apache.hive.service.cli.GetInfoValue; -import org.apache.hive.service.cli.HiveSQLException; -import org.apache.hive.service.cli.OperationHandle; -import org.apache.hive.service.cli.OperationState; -import org.apache.hive.service.cli.OperationStatus; -import org.apache.hive.service.cli.RowSet; -import org.apache.hive.service.cli.RowSetFactory; -import org.apache.hive.service.cli.SessionHandle; -import org.apache.hive.service.cli.TableSchema; +import org.apache.hive.service.cli.*; import org.apache.thrift.TException; /** @@ -377,17 +366,15 @@ } } - /* (non-Javadoc) - * @see org.apache.hive.service.cli.ICLIService#fetchResults(org.apache.hive.service.cli.OperationHandle, org.apache.hive.service.cli.FetchOrientation, long) - */ @Override - public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows) - throws HiveSQLException { + public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows, + FetchType fetchType) throws HiveSQLException { try { TFetchResultsReq req = new TFetchResultsReq(); req.setOperationHandle(opHandle.toTOperationHandle()); req.setOrientation(orientation.toTFetchOrientation()); req.setMaxRows(maxRows); + req.setFetchType(fetchType.toTFetchType()); TFetchResultsResp resp = cliService.FetchResults(req); checkStatus(resp.getStatus()); return RowSetFactory.create(resp.getResults(), opHandle.getProtocolVersion()); @@ -404,7 +391,7 @@ @Override public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException { // TODO: set the correct default fetch size - return fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 10000); + return fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 10000, FetchType.QUERY_OUTPUT); } @Override Index: service/src/test/org/apache/hive/service/cli/operation/TestOperationLoggingAPI.java =================================================================== --- service/src/test/org/apache/hive/service/cli/operation/TestOperationLoggingAPI.java (revision 0) +++ service/src/test/org/apache/hive/service/cli/operation/TestOperationLoggingAPI.java (working copy) @@ -0,0 +1,253 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.service.cli.operation; + +import org.junit.Assert; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hive.service.cli.*; +import org.apache.hive.service.cli.thrift.EmbeddedThriftBinaryCLIService; +import org.apache.hive.service.cli.thrift.ThriftCLIServiceClient; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; + +/** + * TestOperationLoggingAPI + * Test the FetchResults of TFetchType.LOG in thrift level. + */ +public class TestOperationLoggingAPI { + private HiveConf hiveConf = new HiveConf(); + private String tableName = "testOperationLoggingAPI_table"; + private File dataFile; + private ThriftCLIServiceClient client; + private SessionHandle sessionHandle; + private String sql = "select * from " + tableName; + private String[] expectedLogs = { + "Parsing command", + "Parse Completed", + "Starting Semantic Analysis", + "Semantic Analysis Completed", + "Starting command" + }; + + /** + * Start embedded mode, open a session, and create a table for cases usage + * @throws Exception + */ + @Before + public void setUp() throws Exception { + dataFile = new File(hiveConf.get("test.data.files"), "kv1.txt"); + EmbeddedThriftBinaryCLIService service = new EmbeddedThriftBinaryCLIService(); + service.init(hiveConf); + client = new ThriftCLIServiceClient(service); + sessionHandle = setupSession(); + } + + @After + public void tearDown() throws Exception { + // Cleanup + String queryString = "DROP TABLE " + tableName; + client.executeStatement(sessionHandle, queryString, null); + + client.closeSession(sessionHandle); + } + + @Test + public void testFetchResultsOfLog() throws Exception { + // verify whether the sql operation log is generated and fetch correctly. + OperationHandle operationHandle = client.executeStatement(sessionHandle, sql, null); + RowSet rowSetLog = client.fetchResults(operationHandle, FetchOrientation.FETCH_FIRST, 1000, + FetchType.LOG); + verifyFetchedLog(rowSetLog); + } + + @Test + public void testFetchResultsOfLogAsync() throws Exception { + // verify whether the sql operation log is generated and fetch correctly in async mode. + OperationHandle operationHandle = client.executeStatementAsync(sessionHandle, sql, null); + + // Poll on the operation status till the query is completed + boolean isQueryRunning = true; + long pollTimeout = System.currentTimeMillis() + 100000; + OperationStatus opStatus; + OperationState state = null; + RowSet rowSetAccumulated = null; + StringBuilder logs = new StringBuilder(); + + while (isQueryRunning) { + // Break if polling times out + if (System.currentTimeMillis() > pollTimeout) { + break; + } + opStatus = client.getOperationStatus(operationHandle); + Assert.assertNotNull(opStatus); + state = opStatus.getState(); + + rowSetAccumulated = client.fetchResults(operationHandle, FetchOrientation.FETCH_NEXT, 1000, + FetchType.LOG); + for (Object[] row : rowSetAccumulated) { + logs.append(row[0]); + } + + if (state == OperationState.CANCELED || + state == OperationState.CLOSED || + state == OperationState.FINISHED || + state == OperationState.ERROR) { + isQueryRunning = false; + } + Thread.sleep(10); + } + // The sql should be completed now. + Assert.assertEquals("Query should be finished", OperationState.FINISHED, state); + + // Verify the accumulated logs + verifyFetchedLog(logs.toString()); + + // Verify the fetched logs from the beginning of the log file + RowSet rowSet = client.fetchResults(operationHandle, FetchOrientation.FETCH_FIRST, 1000, + FetchType.LOG); + verifyFetchedLog(rowSet); + } + + @Test + public void testFetchResultsOfLogWithOrientation() throws Exception { + // (FETCH_FIRST) execute a sql, and fetch its sql operation log as expected value + OperationHandle operationHandle = client.executeStatement(sessionHandle, sql, null); + RowSet rowSetLog = client.fetchResults(operationHandle, FetchOrientation.FETCH_FIRST, 1000, + FetchType.LOG); + int expectedLogLength = rowSetLog.numRows(); + + // (FETCH_NEXT) execute the same sql again, + // and fetch the sql operation log with FETCH_NEXT orientation + OperationHandle operationHandleWithOrientation = client.executeStatement(sessionHandle, sql, + null); + RowSet rowSetLogWithOrientation; + int logLength = 0; + int maxRows = calculateProperMaxRows(expectedLogLength); + do { + rowSetLogWithOrientation = client.fetchResults(operationHandleWithOrientation, + FetchOrientation.FETCH_NEXT, maxRows, FetchType.LOG); + logLength += rowSetLogWithOrientation.numRows(); + } while (rowSetLogWithOrientation.numRows() == maxRows); + Assert.assertEquals(expectedLogLength, logLength); + + // (FETCH_FIRST) fetch again from the same operation handle with FETCH_FIRST orientation + rowSetLogWithOrientation = client.fetchResults(operationHandleWithOrientation, + FetchOrientation.FETCH_FIRST, 1000, FetchType.LOG); + verifyFetchedLog(rowSetLogWithOrientation); + } + + @Test + public void testFetchResultsOfLogCleanup() throws Exception { + // Verify cleanup functionality. + // Open a new session, since this case needs to close the session in the end. + SessionHandle sessionHandleCleanup = setupSession(); + + // prepare + OperationHandle operationHandle = client.executeStatement(sessionHandleCleanup, sql, null); + RowSet rowSetLog = client.fetchResults(operationHandle, FetchOrientation.FETCH_FIRST, 1000, + FetchType.LOG); + verifyFetchedLog(rowSetLog); + + File sessionLogDir = new File( + hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION) + + File.separator + sessionHandleCleanup.getHandleIdentifier()); + File operationLogFile = new File(sessionLogDir, operationHandle.getHandleIdentifier().toString()); + + // check whether exception is thrown when fetching log from a closed operation. + client.closeOperation(operationHandle); + try { + client.fetchResults(operationHandle, FetchOrientation.FETCH_FIRST, 1000, FetchType.LOG); + Assert.fail("Fetch should fail"); + } catch (HiveSQLException e) { + Assert.assertTrue(e.getMessage().contains("Invalid OperationHandle:")); + } + + // check whether operation log file is deleted. + if (operationLogFile.exists()) { + Assert.fail("Operation log file should be deleted."); + } + + // check whether session log dir is deleted after session is closed. + client.closeSession(sessionHandleCleanup); + if (sessionLogDir.exists()) { + Assert.fail("Session log dir should be deleted."); + } + } + + private SessionHandle setupSession() throws Exception { + // Open a session + SessionHandle sessionHandle = client.openSession(null, null, null); + + // Change lock manager to embedded mode + String queryString = "SET hive.lock.manager=" + + "org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager"; + client.executeStatement(sessionHandle, queryString, null); + + // Drop the table if it exists + queryString = "DROP TABLE IF EXISTS " + tableName; + client.executeStatement(sessionHandle, queryString, null); + + // Create a test table + queryString = "create table " + tableName + " (key int, value string)"; + client.executeStatement(sessionHandle, queryString, null); + + // Load data + queryString = "load data local inpath '" + dataFile + "' into table " + tableName; + client.executeStatement(sessionHandle, queryString, null); + + // Precondition check: verify whether the table is created and data is fetched correctly. + OperationHandle operationHandle = client.executeStatement(sessionHandle, sql, null); + RowSet rowSetResult = client.fetchResults(operationHandle); + Assert.assertEquals(500, rowSetResult.numRows()); + Assert.assertEquals(238, rowSetResult.iterator().next()[0]); + Assert.assertEquals("val_238", rowSetResult.iterator().next()[1]); + + return sessionHandle; + } + + // Since the log length of the sql operation may vary during HIVE dev, calculate a proper maxRows. + private int calculateProperMaxRows(int len) { + if (len < 10) { + return 1; + } else if (len < 100) { + return 10; + } else { + return 100; + } + } + + private void verifyFetchedLog(RowSet rowSet) { + StringBuilder stringBuilder = new StringBuilder(); + + for (Object[] row : rowSet) { + stringBuilder.append(row[0]); + } + + String logs = stringBuilder.toString(); + verifyFetchedLog(logs); + } + + private void verifyFetchedLog(String logs) { + for (String log : expectedLogs) { + Assert.assertTrue(logs.contains(log)); + } + } +}