diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 3bfc681..86062db 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1383,6 +1383,14 @@ "to construct a list exception handlers to handle exceptions thrown\n" + "by record readers"), + // operation log configuration + HIVE_SERVER2_LOGGING_OPERATION_ENABLED("hive.server2.logging.operation.enabled", true, + "When true, HS2 will save operation logs"), + HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION("hive.server2.logging.operation.log.location", + "${system:java.io.tmpdir}" + File.separator + "${system:user.name}" + File.separator + + "operation_logs", + "Top level directory where operation logs are stored if logging functionality is enabled"), + // logging configuration HIVE_LOG4J_FILE("hive.log4j.file", "", "Hive log4j configuration file.\n" + diff --git service/if/TCLIService.thrift service/if/TCLIService.thrift index 80086b4..dd7d367 100644 --- service/if/TCLIService.thrift +++ service/if/TCLIService.thrift @@ -1054,6 +1054,9 @@ struct TFetchResultsReq { // Max number of rows that should be returned in // the rowset. 3: required i64 maxRows + + /** The type of a fetch results request. 0 represents Query output. 1 represents Log */ + 4: optional i16 fetchType = 0 } struct TFetchResultsResp { diff --git service/src/gen/thrift/gen-cpp/TCLIService_types.h service/src/gen/thrift/gen-cpp/TCLIService_types.h index 1b37fb5..f32dc3c 100644 --- service/src/gen/thrift/gen-cpp/TCLIService_types.h +++ service/src/gen/thrift/gen-cpp/TCLIService_types.h @@ -3602,14 +3602,18 @@ class TGetResultSetMetadataResp { void swap(TGetResultSetMetadataResp &a, TGetResultSetMetadataResp &b); +typedef struct _TFetchResultsReq__isset { + _TFetchResultsReq__isset() : fetchType(true) {} + bool fetchType; +} _TFetchResultsReq__isset; class TFetchResultsReq { public: - static const char* ascii_fingerprint; // = "1B96A8C05BA9DD699FC8CD842240ABDE"; - static const uint8_t binary_fingerprint[16]; // = {0x1B,0x96,0xA8,0xC0,0x5B,0xA9,0xDD,0x69,0x9F,0xC8,0xCD,0x84,0x22,0x40,0xAB,0xDE}; + static const char* ascii_fingerprint; // = "B4CB1E4F8F8F4D50183DD372AD11753A"; + static const uint8_t binary_fingerprint[16]; // = {0xB4,0xCB,0x1E,0x4F,0x8F,0x8F,0x4D,0x50,0x18,0x3D,0xD3,0x72,0xAD,0x11,0x75,0x3A}; - TFetchResultsReq() : orientation((TFetchOrientation::type)0), maxRows(0) { + TFetchResultsReq() : orientation((TFetchOrientation::type)0), maxRows(0), fetchType(0) { orientation = (TFetchOrientation::type)0; } @@ -3619,6 +3623,9 @@ class TFetchResultsReq { TOperationHandle operationHandle; TFetchOrientation::type orientation; int64_t maxRows; + int16_t fetchType; + + _TFetchResultsReq__isset __isset; void __set_operationHandle(const TOperationHandle& val) { operationHandle = val; @@ -3632,6 +3639,11 @@ class TFetchResultsReq { maxRows = val; } + void __set_fetchType(const int16_t val) { + fetchType = val; + __isset.fetchType = true; + } + bool operator == (const TFetchResultsReq & rhs) const { if (!(operationHandle == rhs.operationHandle)) @@ -3640,6 +3652,10 @@ class TFetchResultsReq { return false; if (!(maxRows == rhs.maxRows)) return false; + if (__isset.fetchType != rhs.__isset.fetchType) + return false; + else if (__isset.fetchType && !(fetchType == rhs.fetchType)) + return false; return true; } bool operator != (const TFetchResultsReq &rhs) const { diff --git service/src/gen/thrift/gen-cpp/TCLIService_types.cpp service/src/gen/thrift/gen-cpp/TCLIService_types.cpp index d5f98a8..326d25b 100644 --- service/src/gen/thrift/gen-cpp/TCLIService_types.cpp +++ service/src/gen/thrift/gen-cpp/TCLIService_types.cpp @@ -6137,8 +6137,8 @@ void swap(TGetResultSetMetadataResp &a, TGetResultSetMetadataResp &b) { swap(a.__isset, b.__isset); } -const char* TFetchResultsReq::ascii_fingerprint = "1B96A8C05BA9DD699FC8CD842240ABDE"; -const uint8_t TFetchResultsReq::binary_fingerprint[16] = {0x1B,0x96,0xA8,0xC0,0x5B,0xA9,0xDD,0x69,0x9F,0xC8,0xCD,0x84,0x22,0x40,0xAB,0xDE}; +const char* TFetchResultsReq::ascii_fingerprint = "B4CB1E4F8F8F4D50183DD372AD11753A"; +const uint8_t TFetchResultsReq::binary_fingerprint[16] = {0xB4,0xCB,0x1E,0x4F,0x8F,0x8F,0x4D,0x50,0x18,0x3D,0xD3,0x72,0xAD,0x11,0x75,0x3A}; uint32_t TFetchResultsReq::read(::apache::thrift::protocol::TProtocol* iprot) { @@ -6189,6 +6189,14 @@ uint32_t TFetchResultsReq::read(::apache::thrift::protocol::TProtocol* iprot) { xfer += iprot->skip(ftype); } break; + case 4: + if (ftype == ::apache::thrift::protocol::T_I16) { + xfer += iprot->readI16(this->fetchType); + this->__isset.fetchType = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -6223,6 +6231,11 @@ uint32_t TFetchResultsReq::write(::apache::thrift::protocol::TProtocol* oprot) c xfer += oprot->writeI64(this->maxRows); xfer += oprot->writeFieldEnd(); + if (this->__isset.fetchType) { + xfer += oprot->writeFieldBegin("fetchType", ::apache::thrift::protocol::T_I16, 4); + xfer += oprot->writeI16(this->fetchType); + xfer += oprot->writeFieldEnd(); + } xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -6233,6 +6246,8 @@ void swap(TFetchResultsReq &a, TFetchResultsReq &b) { swap(a.operationHandle, b.operationHandle); swap(a.orientation, b.orientation); swap(a.maxRows, b.maxRows); + swap(a.fetchType, b.fetchType); + swap(a.__isset, b.__isset); } const char* TFetchResultsResp::ascii_fingerprint = "FC43BC2D6F3B76D4DB0F34226A745C8E"; diff --git service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TFetchResultsReq.java service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TFetchResultsReq.java index 808b73f..05fc4d8 100644 --- service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TFetchResultsReq.java +++ service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TFetchResultsReq.java @@ -37,6 +37,7 @@ private static final org.apache.thrift.protocol.TField OPERATION_HANDLE_FIELD_DESC = new org.apache.thrift.protocol.TField("operationHandle", org.apache.thrift.protocol.TType.STRUCT, (short)1); private static final org.apache.thrift.protocol.TField ORIENTATION_FIELD_DESC = new org.apache.thrift.protocol.TField("orientation", org.apache.thrift.protocol.TType.I32, (short)2); private static final org.apache.thrift.protocol.TField MAX_ROWS_FIELD_DESC = new org.apache.thrift.protocol.TField("maxRows", org.apache.thrift.protocol.TType.I64, (short)3); + private static final org.apache.thrift.protocol.TField FETCH_TYPE_FIELD_DESC = new org.apache.thrift.protocol.TField("fetchType", org.apache.thrift.protocol.TType.I16, (short)4); private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); static { @@ -47,6 +48,7 @@ private TOperationHandle operationHandle; // required private TFetchOrientation orientation; // required private long maxRows; // required + private short fetchType; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -56,7 +58,11 @@ * @see TFetchOrientation */ ORIENTATION((short)2, "orientation"), - MAX_ROWS((short)3, "maxRows"); + MAX_ROWS((short)3, "maxRows"), + /** + * The type of a fetch results request. + */ + FETCH_TYPE((short)4, "fetchType"); private static final Map byName = new HashMap(); @@ -77,6 +83,8 @@ public static _Fields findByThriftId(int fieldId) { return ORIENTATION; case 3: // MAX_ROWS return MAX_ROWS; + case 4: // FETCH_TYPE + return FETCH_TYPE; default: return null; } @@ -118,7 +126,9 @@ public String getFieldName() { // isset id assignments private static final int __MAXROWS_ISSET_ID = 0; + private static final int __FETCHTYPE_ISSET_ID = 1; private byte __isset_bitfield = 0; + private _Fields optionals[] = {_Fields.FETCH_TYPE}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -128,6 +138,8 @@ public String getFieldName() { new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, TFetchOrientation.class))); tmpMap.put(_Fields.MAX_ROWS, new org.apache.thrift.meta_data.FieldMetaData("maxRows", org.apache.thrift.TFieldRequirementType.REQUIRED, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); + tmpMap.put(_Fields.FETCH_TYPE, new org.apache.thrift.meta_data.FieldMetaData("fetchType", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I16))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TFetchResultsReq.class, metaDataMap); } @@ -135,6 +147,8 @@ public String getFieldName() { public TFetchResultsReq() { this.orientation = org.apache.hive.service.cli.thrift.TFetchOrientation.FETCH_NEXT; + this.fetchType = (short)0; + } public TFetchResultsReq( @@ -161,6 +175,7 @@ public TFetchResultsReq(TFetchResultsReq other) { this.orientation = other.orientation; } this.maxRows = other.maxRows; + this.fetchType = other.fetchType; } public TFetchResultsReq deepCopy() { @@ -174,6 +189,8 @@ public void clear() { setMaxRowsIsSet(false); this.maxRows = 0; + this.fetchType = (short)0; + } public TOperationHandle getOperationHandle() { @@ -252,6 +269,34 @@ public void setMaxRowsIsSet(boolean value) { __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __MAXROWS_ISSET_ID, value); } + /** + * The type of a fetch results request. + */ + public short getFetchType() { + return this.fetchType; + } + + /** + * The type of a fetch results request. + */ + public void setFetchType(short fetchType) { + this.fetchType = fetchType; + setFetchTypeIsSet(true); + } + + public void unsetFetchType() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __FETCHTYPE_ISSET_ID); + } + + /** Returns true if field fetchType is set (has been assigned a value) and false otherwise */ + public boolean isSetFetchType() { + return EncodingUtils.testBit(__isset_bitfield, __FETCHTYPE_ISSET_ID); + } + + public void setFetchTypeIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __FETCHTYPE_ISSET_ID, value); + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case OPERATION_HANDLE: @@ -278,6 +323,14 @@ public void setFieldValue(_Fields field, Object value) { } break; + case FETCH_TYPE: + if (value == null) { + unsetFetchType(); + } else { + setFetchType((Short)value); + } + break; + } } @@ -292,6 +345,9 @@ public Object getFieldValue(_Fields field) { case MAX_ROWS: return Long.valueOf(getMaxRows()); + case FETCH_TYPE: + return Short.valueOf(getFetchType()); + } throw new IllegalStateException(); } @@ -309,6 +365,8 @@ public boolean isSet(_Fields field) { return isSetOrientation(); case MAX_ROWS: return isSetMaxRows(); + case FETCH_TYPE: + return isSetFetchType(); } throw new IllegalStateException(); } @@ -353,6 +411,15 @@ public boolean equals(TFetchResultsReq that) { return false; } + boolean this_present_fetchType = true && this.isSetFetchType(); + boolean that_present_fetchType = true && that.isSetFetchType(); + if (this_present_fetchType || that_present_fetchType) { + if (!(this_present_fetchType && that_present_fetchType)) + return false; + if (this.fetchType != that.fetchType) + return false; + } + return true; } @@ -375,6 +442,11 @@ public int hashCode() { if (present_maxRows) builder.append(maxRows); + boolean present_fetchType = true && (isSetFetchType()); + builder.append(present_fetchType); + if (present_fetchType) + builder.append(fetchType); + return builder.toHashCode(); } @@ -416,6 +488,16 @@ public int compareTo(TFetchResultsReq other) { return lastComparison; } } + lastComparison = Boolean.valueOf(isSetFetchType()).compareTo(typedOther.isSetFetchType()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetFetchType()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.fetchType, typedOther.fetchType); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -455,6 +537,12 @@ public String toString() { sb.append("maxRows:"); sb.append(this.maxRows); first = false; + if (isSetFetchType()) { + if (!first) sb.append(", "); + sb.append("fetchType:"); + sb.append(this.fetchType); + first = false; + } sb.append(")"); return sb.toString(); } @@ -540,6 +628,14 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, TFetchResultsReq st org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 4: // FETCH_TYPE + if (schemeField.type == org.apache.thrift.protocol.TType.I16) { + struct.fetchType = iprot.readI16(); + struct.setFetchTypeIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -566,6 +662,11 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, TFetchResultsReq s oprot.writeFieldBegin(MAX_ROWS_FIELD_DESC); oprot.writeI64(struct.maxRows); oprot.writeFieldEnd(); + if (struct.isSetFetchType()) { + oprot.writeFieldBegin(FETCH_TYPE_FIELD_DESC); + oprot.writeI16(struct.fetchType); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -586,6 +687,14 @@ public void write(org.apache.thrift.protocol.TProtocol prot, TFetchResultsReq st struct.operationHandle.write(oprot); oprot.writeI32(struct.orientation.getValue()); oprot.writeI64(struct.maxRows); + BitSet optionals = new BitSet(); + if (struct.isSetFetchType()) { + optionals.set(0); + } + oprot.writeBitSet(optionals, 1); + if (struct.isSetFetchType()) { + oprot.writeI16(struct.fetchType); + } } @Override @@ -598,6 +707,11 @@ public void read(org.apache.thrift.protocol.TProtocol prot, TFetchResultsReq str struct.setOrientationIsSet(true); struct.maxRows = iprot.readI64(); struct.setMaxRowsIsSet(true); + BitSet incoming = iprot.readBitSet(1); + if (incoming.get(0)) { + struct.fetchType = iprot.readI16(); + struct.setFetchTypeIsSet(true); + } } } diff --git service/src/gen/thrift/gen-py/TCLIService/ttypes.py service/src/gen/thrift/gen-py/TCLIService/ttypes.py index 2cbbdd8..0672ef1 100644 --- service/src/gen/thrift/gen-py/TCLIService/ttypes.py +++ service/src/gen/thrift/gen-py/TCLIService/ttypes.py @@ -5752,6 +5752,7 @@ class TFetchResultsReq: - operationHandle - orientation - maxRows + - fetchType: The type of a fetch results request. """ thrift_spec = ( @@ -5759,12 +5760,14 @@ class TFetchResultsReq: (1, TType.STRUCT, 'operationHandle', (TOperationHandle, TOperationHandle.thrift_spec), None, ), # 1 (2, TType.I32, 'orientation', None, 0, ), # 2 (3, TType.I64, 'maxRows', None, None, ), # 3 + (4, TType.I16, 'fetchType', None, 0, ), # 4 ) - def __init__(self, operationHandle=None, orientation=thrift_spec[2][4], maxRows=None,): + def __init__(self, operationHandle=None, orientation=thrift_spec[2][4], maxRows=None, fetchType=thrift_spec[4][4],): self.operationHandle = operationHandle self.orientation = orientation self.maxRows = maxRows + self.fetchType = fetchType def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -5791,6 +5794,11 @@ def read(self, iprot): self.maxRows = iprot.readI64(); else: iprot.skip(ftype) + elif fid == 4: + if ftype == TType.I16: + self.fetchType = iprot.readI16(); + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -5813,6 +5821,10 @@ def write(self, oprot): oprot.writeFieldBegin('maxRows', TType.I64, 3) oprot.writeI64(self.maxRows) oprot.writeFieldEnd() + if self.fetchType is not None: + oprot.writeFieldBegin('fetchType', TType.I16, 4) + oprot.writeI16(self.fetchType) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() diff --git service/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb service/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb index 93f9a81..88a570f 100644 --- service/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb +++ service/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb @@ -1598,11 +1598,14 @@ class TFetchResultsReq OPERATIONHANDLE = 1 ORIENTATION = 2 MAXROWS = 3 + FETCHTYPE = 4 FIELDS = { OPERATIONHANDLE => {:type => ::Thrift::Types::STRUCT, :name => 'operationHandle', :class => ::TOperationHandle}, ORIENTATION => {:type => ::Thrift::Types::I32, :name => 'orientation', :default => 0, :enum_class => ::TFetchOrientation}, - MAXROWS => {:type => ::Thrift::Types::I64, :name => 'maxRows'} + MAXROWS => {:type => ::Thrift::Types::I64, :name => 'maxRows'}, + # The type of a fetch results request. + FETCHTYPE => {:type => ::Thrift::Types::I16, :name => 'fetchType', :default => 0, :optional => true} } def struct_fields; FIELDS; end diff --git service/src/java/org/apache/hive/service/cli/CLIService.java service/src/java/org/apache/hive/service/cli/CLIService.java index add37a1..17beb3d 100644 --- service/src/java/org/apache/hive/service/cli/CLIService.java +++ service/src/java/org/apache/hive/service/cli/CLIService.java @@ -397,26 +397,35 @@ public TableSchema getResultSetMetadata(OperationHandle opHandle) return tableSchema; } + @Override + public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, + long maxRows) throws HiveSQLException { + return fetchResults(opHandle, orientation, maxRows, FetchType.QUERY_OUTPUT); + } + /* (non-Javadoc) - * @see org.apache.hive.service.cli.ICLIService#fetchResults(org.apache.hive.service.cli.OperationHandle, org.apache.hive.service.cli.FetchOrientation, long) + * @see org.apache.hive.service.cli.ICLIService#fetchResults(org.apache.hive.service.cli.OperationHandle) */ @Override - public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows) + public RowSet fetchResults(OperationHandle opHandle) + throws HiveSQLException { + return fetchResults(opHandle, FetchType.QUERY_OUTPUT); + } + + @Override + public RowSet fetchResults(OperationHandle opHandle, FetchType fetchType) throws HiveSQLException { RowSet rowSet = sessionManager.getOperationManager().getOperation(opHandle) - .getParentSession().fetchResults(opHandle, orientation, maxRows); + .getParentSession().fetchResults(opHandle, fetchType); LOG.debug(opHandle + ": fetchResults()"); return rowSet; } - /* (non-Javadoc) - * @see org.apache.hive.service.cli.ICLIService#fetchResults(org.apache.hive.service.cli.OperationHandle) - */ @Override - public RowSet fetchResults(OperationHandle opHandle) - throws HiveSQLException { + public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, + long maxRows, FetchType fetchType) throws HiveSQLException { RowSet rowSet = sessionManager.getOperationManager().getOperation(opHandle) - .getParentSession().fetchResults(opHandle); + .getParentSession().fetchResults(opHandle, orientation, maxRows, fetchType); LOG.debug(opHandle + ": fetchResults()"); return rowSet; } diff --git service/src/java/org/apache/hive/service/cli/CLIServiceClient.java service/src/java/org/apache/hive/service/cli/CLIServiceClient.java index 87c10b9..59a4d92 100644 --- service/src/java/org/apache/hive/service/cli/CLIServiceClient.java +++ service/src/java/org/apache/hive/service/cli/CLIServiceClient.java @@ -28,19 +28,29 @@ * */ public abstract class CLIServiceClient implements ICLIService { + private static final long DEFAULT_MAX_ROWS = 1000; public SessionHandle openSession(String username, String password) throws HiveSQLException { return openSession(username, password, Collections.emptyMap()); } - /* (non-Javadoc) - * @see org.apache.hive.service.cli.ICLIService#fetchResults(org.apache.hive.service.cli.OperationHandle) - */ @Override public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException { // TODO: provide STATIC default value - return fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 1000); + return fetchResults(opHandle, FetchOrientation.FETCH_NEXT, DEFAULT_MAX_ROWS, FetchType.QUERY_OUTPUT); + } + + @Override + public RowSet fetchResults(OperationHandle opHandle, FetchType fetchType) + throws HiveSQLException { + return fetchResults(opHandle, FetchOrientation.FETCH_NEXT, DEFAULT_MAX_ROWS, fetchType); + } + + @Override + public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, + long maxRows) throws HiveSQLException { + return fetchResults(opHandle, orientation, maxRows, FetchType.QUERY_OUTPUT); } @Override diff --git service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java index f665146..9cad5be 100644 --- service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java +++ service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java @@ -181,13 +181,10 @@ public TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQL return cliService.getResultSetMetadata(opHandle); } - /* (non-Javadoc) - * @see org.apache.hive.service.cli.CLIServiceClient#fetchResults(org.apache.hive.service.cli.OperationHandle, org.apache.hive.service.cli.FetchOrientation, long) - */ @Override - public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows) - throws HiveSQLException { - return cliService.fetchResults(opHandle, orientation, maxRows); + public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, + long maxRows, FetchType fetchType) throws HiveSQLException { + return cliService.fetchResults(opHandle, orientation, maxRows, fetchType); } diff --git service/src/java/org/apache/hive/service/cli/FetchType.java service/src/java/org/apache/hive/service/cli/FetchType.java new file mode 100644 index 0000000..a8e7fe1 --- /dev/null +++ service/src/java/org/apache/hive/service/cli/FetchType.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.cli; + +/** + * FetchType indicates the type of fetchResults request. + * It maps the TFetchType, which is generated from Thrift interface. + */ +public enum FetchType { + QUERY_OUTPUT((short)0), + LOG((short)1); + + private final short tFetchType; + + FetchType(short tFetchType) { + this.tFetchType = tFetchType; + } + + public static FetchType getFetchType(short tFetchType) { + for (FetchType fetchType : values()) { + if (tFetchType == fetchType.toTFetchType()) { + return fetchType; + } + } + return QUERY_OUTPUT; + } + + public short toTFetchType() { + return tFetchType; + } +} diff --git service/src/java/org/apache/hive/service/cli/ICLIService.java service/src/java/org/apache/hive/service/cli/ICLIService.java index c569796..6a6439f 100644 --- service/src/java/org/apache/hive/service/cli/ICLIService.java +++ service/src/java/org/apache/hive/service/cli/ICLIService.java @@ -27,79 +27,86 @@ public interface ICLIService { - public abstract SessionHandle openSession(String username, String password, + SessionHandle openSession(String username, String password, Map configuration) throws HiveSQLException; - public abstract SessionHandle openSessionWithImpersonation(String username, String password, + SessionHandle openSessionWithImpersonation(String username, String password, Map configuration, String delegationToken) throws HiveSQLException; - public abstract void closeSession(SessionHandle sessionHandle) + void closeSession(SessionHandle sessionHandle) throws HiveSQLException; - public abstract GetInfoValue getInfo(SessionHandle sessionHandle, GetInfoType infoType) + GetInfoValue getInfo(SessionHandle sessionHandle, GetInfoType infoType) throws HiveSQLException; - public abstract OperationHandle executeStatement(SessionHandle sessionHandle, String statement, + OperationHandle executeStatement(SessionHandle sessionHandle, String statement, Map confOverlay) throws HiveSQLException; - public abstract OperationHandle executeStatementAsync(SessionHandle sessionHandle, + OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement, Map confOverlay) throws HiveSQLException; - public abstract OperationHandle getTypeInfo(SessionHandle sessionHandle) + OperationHandle getTypeInfo(SessionHandle sessionHandle) throws HiveSQLException; - public abstract OperationHandle getCatalogs(SessionHandle sessionHandle) + OperationHandle getCatalogs(SessionHandle sessionHandle) throws HiveSQLException; - public abstract OperationHandle getSchemas(SessionHandle sessionHandle, + OperationHandle getSchemas(SessionHandle sessionHandle, String catalogName, String schemaName) throws HiveSQLException; - public abstract OperationHandle getTables(SessionHandle sessionHandle, + OperationHandle getTables(SessionHandle sessionHandle, String catalogName, String schemaName, String tableName, List tableTypes) throws HiveSQLException; - public abstract OperationHandle getTableTypes(SessionHandle sessionHandle) + OperationHandle getTableTypes(SessionHandle sessionHandle) throws HiveSQLException; - public abstract OperationHandle getColumns(SessionHandle sessionHandle, + OperationHandle getColumns(SessionHandle sessionHandle, String catalogName, String schemaName, String tableName, String columnName) throws HiveSQLException; - public abstract OperationHandle getFunctions(SessionHandle sessionHandle, + OperationHandle getFunctions(SessionHandle sessionHandle, String catalogName, String schemaName, String functionName) throws HiveSQLException; - public abstract OperationStatus getOperationStatus(OperationHandle opHandle) + OperationStatus getOperationStatus(OperationHandle opHandle) throws HiveSQLException; - public abstract void cancelOperation(OperationHandle opHandle) + void cancelOperation(OperationHandle opHandle) throws HiveSQLException; - public abstract void closeOperation(OperationHandle opHandle) + void closeOperation(OperationHandle opHandle) throws HiveSQLException; - public abstract TableSchema getResultSetMetadata(OperationHandle opHandle) + TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException; - public abstract RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, - long maxRows) - throws HiveSQLException; + @Deprecated + RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, + long maxRows) throws HiveSQLException; + + @Deprecated + RowSet fetchResults(OperationHandle opHandle) + throws HiveSQLException; - public abstract RowSet fetchResults(OperationHandle opHandle) + RowSet fetchResults(OperationHandle opHandle, FetchType fetchType) throws HiveSQLException; - public abstract String getDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory, + RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, + long maxRows, FetchType fetchType) throws HiveSQLException; + + String getDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory, String owner, String renewer) throws HiveSQLException; - public abstract void cancelDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory, + void cancelDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory, String tokenStr) throws HiveSQLException; - public abstract void renewDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory, + void renewDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory, String tokenStr) throws HiveSQLException; diff --git service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java index c9fd5f9..fd0dec4 100644 --- service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java +++ service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java @@ -46,7 +46,7 @@ protected GetCatalogsOperation(HiveSession parentSession) { * @see org.apache.hive.service.cli.Operation#run() */ @Override - public void run() throws HiveSQLException { + public void runInternal() throws HiveSQLException { setState(OperationState.RUNNING); setState(OperationState.FINISHED); } diff --git service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java index caf413d..fa425c1 100644 --- service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java +++ service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java @@ -118,7 +118,7 @@ protected GetColumnsOperation(HiveSession parentSession, String catalogName, Str * @see org.apache.hive.service.cli.Operation#run() */ @Override - public void run() throws HiveSQLException { + public void runInternal() throws HiveSQLException { setState(OperationState.RUNNING); try { IMetaStoreClient metastoreClient = getParentSession().getMetaStoreClient(); diff --git service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java index fd4e94d..1931679 100644 --- service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java +++ service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java @@ -72,7 +72,7 @@ public GetFunctionsOperation(HiveSession parentSession, * @see org.apache.hive.service.cli.Operation#run() */ @Override - public void run() throws HiveSQLException { + public void runInternal() throws HiveSQLException { setState(OperationState.RUNNING); try { if ((null == catalogName || "".equals(catalogName)) diff --git service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java index ebca996..b5fc351 100644 --- service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java +++ service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java @@ -54,7 +54,7 @@ protected GetSchemasOperation(HiveSession parentSession, * @see org.apache.hive.service.cli.Operation#run() */ @Override - public void run() throws HiveSQLException { + public void runInternal() throws HiveSQLException { setState(OperationState.RUNNING); try { IMetaStoreClient metastoreClient = getParentSession().getMetaStoreClient(); diff --git service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java index 05991e0..1c9c20f 100644 --- service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java +++ service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java @@ -54,7 +54,7 @@ protected GetTableTypesOperation(HiveSession parentSession) { * @see org.apache.hive.service.cli.Operation#run() */ @Override - public void run() throws HiveSQLException { + public void runInternal() throws HiveSQLException { setState(OperationState.RUNNING); try { for (TableType type : TableType.values()) { diff --git service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java index 315dbea..a76e0dc 100644 --- service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java +++ service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java @@ -75,7 +75,7 @@ protected GetTablesOperation(HiveSession parentSession, * @see org.apache.hive.service.cli.Operation#run() */ @Override - public void run() throws HiveSQLException { + public void runInternal() throws HiveSQLException { setState(OperationState.RUNNING); try { IMetaStoreClient metastoreClient = getParentSession().getMetaStoreClient(); diff --git service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java index 0ec2543..6904214 100644 --- service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java +++ service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java @@ -83,7 +83,7 @@ protected GetTypeInfoOperation(HiveSession parentSession) { * @see org.apache.hive.service.cli.Operation#run() */ @Override - public void run() throws HiveSQLException { + public void runInternal() throws HiveSQLException { setState(OperationState.RUNNING); try { for (Type type : Type.values()) { diff --git service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java index 3d3fddc..b887a2f 100644 --- service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java +++ service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java @@ -98,7 +98,7 @@ private void tearDownSessionIO() { * @see org.apache.hive.service.cli.operation.Operation#run() */ @Override - public void run() throws HiveSQLException { + public void runInternal() throws HiveSQLException { setState(OperationState.RUNNING); try { String command = getStatement().trim(); @@ -136,6 +136,7 @@ public void close() throws HiveSQLException { setState(OperationState.CLOSED); tearDownSessionIO(); cleanTmpFile(); + cleanupOperationLog(); } /* (non-Javadoc) diff --git service/src/java/org/apache/hive/service/cli/operation/LogDivertAppender.java service/src/java/org/apache/hive/service/cli/operation/LogDivertAppender.java new file mode 100644 index 0000000..7e61919 --- /dev/null +++ service/src/java/org/apache/hive/service/cli/operation/LogDivertAppender.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.cli.operation; +import java.io.CharArrayWriter; + +import org.apache.log4j.Layout; +import org.apache.log4j.Logger; +import org.apache.log4j.WriterAppender; +import org.apache.log4j.spi.Filter; +import org.apache.log4j.spi.LoggingEvent; + +/** + * An Appender to divert logs from individual threads to the LogObject they belong to. + */ +public class LogDivertAppender extends WriterAppender { + private static final Logger LOG = Logger.getLogger(LogDivertAppender.class.getName()); + private final OperationManager operationManager; + + /** + * A log filter that exclude messages coming from the logger with the given name. + * We apply this filter on the Loggers used by the log diversion stuff, so that + * they don't generate more logs for themselves when they process logs. + */ + private static class NameExclusionFilter extends Filter { + private String excludeLoggerName = null; + + public NameExclusionFilter(String excludeLoggerName) { + this.excludeLoggerName = excludeLoggerName; + } + + @Override + public int decide(LoggingEvent ev) { + if (ev.getLoggerName().equals(excludeLoggerName)) { + return Filter.DENY; + } + return Filter.NEUTRAL; + } + } + + /** This is where the log message will go to */ + private final CharArrayWriter writer = new CharArrayWriter(); + + public LogDivertAppender(Layout layout, OperationManager operationManager) { + setLayout(layout); + setWriter(writer); + setName("LogDivertAppender"); + this.operationManager = operationManager; + + // Filter out messages coming from log processing classes, or we'll run an infinite loop. + addFilter(new NameExclusionFilter(LOG.getName())); + addFilter(new NameExclusionFilter(OperationLog.class.getName())); + addFilter(new NameExclusionFilter(OperationManager.class.getName())); + } + + /** + * Overrides WriterAppender.subAppend(), which does the real logging. + * No need to worry about concurrency since log4j calls this synchronously. + */ + @Override + protected void subAppend(LoggingEvent event) { + super.subAppend(event); + // That should've gone into our writer. Notify the LogContext. + String logOutput = writer.toString(); + writer.reset(); + + OperationLog log = operationManager.getOperationLogByThread(); + if (log == null) { + LOG.debug(" ---+++=== Dropped log event from thread " + event.getThreadName()); + return; + } + log.writeOperationLog(logOutput); + } +} diff --git service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java index e0d17a1..3a1e2a0 100644 --- service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java +++ service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java @@ -46,6 +46,7 @@ protected MetadataOperation(HiveSession parentSession, OperationType opType) { @Override public void close() throws HiveSQLException { setState(OperationState.CLOSED); + cleanupOperationLog(); } /** diff --git service/src/java/org/apache/hive/service/cli/operation/Operation.java service/src/java/org/apache/hive/service/cli/operation/Operation.java index 45fbd61..a875257 100644 --- service/src/java/org/apache/hive/service/cli/operation/Operation.java +++ service/src/java/org/apache/hive/service/cli/operation/Operation.java @@ -17,6 +17,8 @@ */ package org.apache.hive.service.cli.operation; +import java.io.File; +import java.io.FileNotFoundException; import java.util.EnumSet; import java.util.concurrent.Future; @@ -46,6 +48,8 @@ protected volatile HiveSQLException operationException; protected final boolean runAsync; protected volatile Future backgroundHandle; + protected OperationLog operationLog; + protected boolean isOperationLogEnabled; protected static final EnumSet DEFAULT_FETCH_ORIENTATION_SET = EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST); @@ -106,6 +110,15 @@ protected void setHasResultSet(boolean hasResultSet) { opHandle.setHasResultSet(hasResultSet); } + + public OperationLog getOperationLog() { + return operationLog; + } + + public void setOperationLog(OperationLog operationLog) { + this.operationLog = operationLog; + } + protected final OperationState setState(OperationState newState) throws HiveSQLException { state.validateTransition(newState); this.state = newState; @@ -138,7 +151,95 @@ public boolean isFailed() { return OperationState.ERROR.equals(state); } - public abstract void run() throws HiveSQLException; + protected void createOperationLog() { + if (parentSession.isOperationLogEnabled()) { + File operationLogFile = new File(parentSession.getOperationLogSessionDir(), + opHandle.getHandleIdentifier().toString()); + isOperationLogEnabled = true; + + // create log file + try { + if (operationLogFile.exists()) { + operationLogFile.delete(); + } + if (!operationLogFile.createNewFile()) { + // the log file already exists and cannot be deleted. + // If it can be read/written, keep its contents and use it. + if (!operationLogFile.canRead() || !operationLogFile.canWrite()) { + LOG.warn("The already existed operation log file cannot be recreated, " + + "and it cannot be read or written: " + operationLogFile.getAbsolutePath()); + isOperationLogEnabled = false; + return; + } + } + } catch (Exception e) { + LOG.warn("Unable to create operation log file: " + operationLogFile.getAbsolutePath(), e); + isOperationLogEnabled = false; + return; + } + + // create OperationLog object with above log file + try { + operationLog = new OperationLog(opHandle.toString(), operationLogFile); + } catch (FileNotFoundException e) { + LOG.warn("Unable to instantiate OperationLog object for operation: " + + opHandle, e); + isOperationLogEnabled = false; + return; + } + + // register this operationLog to current thread + OperationLog.setCurrentOperationLog(operationLog); + } + } + + protected void unregisterOperationLog() { + if (isOperationLogEnabled) { + OperationLog.removeCurrentOperationLog(); + } + } + + /** + * Invoked before runInternal(). + * Set up some preconditions, or configurations. + */ + protected void beforeRun() { + createOperationLog(); + } + + /** + * Invoked after runInternal(), even if an exception is thrown in runInternal(). + * Clean up resources, which was set up in beforeRun(). + */ + protected void afterRun() { + unregisterOperationLog(); + } + + /** + * Implemented by subclass of Operation class to execute specific behaviors. + * @throws HiveSQLException + */ + protected abstract void runInternal() throws HiveSQLException; + + public void run() throws HiveSQLException { + beforeRun(); + try { + runInternal(); + } finally { + afterRun(); + } + } + + protected void cleanupOperationLog() throws HiveSQLException{ + if (isOperationLogEnabled) { + if (operationLog == null) { + LOG.error("Operation [ " + opHandle.getHandleIdentifier() + " ] " + + "logging is enabled, but its OperationLog object cannot be found."); + } else { + operationLog.close(); + } + } + } // TODO: make this abstract and implement in subclasses. public void cancel() throws HiveSQLException { diff --git service/src/java/org/apache/hive/service/cli/operation/OperationLog.java service/src/java/org/apache/hive/service/cli/operation/OperationLog.java new file mode 100644 index 0000000..1698b87 --- /dev/null +++ service/src/java/org/apache/hive/service/cli/operation/OperationLog.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.service.cli.operation; + +import com.google.common.base.Charsets; +import org.apache.commons.io.FileUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.IOUtils; +import org.apache.hive.service.cli.FetchOrientation; +import org.apache.hive.service.cli.HiveSQLException; +import org.apache.hive.service.cli.OperationHandle; + +import java.io.*; +import java.util.ArrayList; +import java.util.List; + +/** + * OperationLog wraps the actual operation log file, and provides interface + * for accessing, reading, writing, and removing the file. + */ +public class OperationLog { + private static final Log LOG = LogFactory.getLog(OperationLog.class.getName()); + public static final FetchOrientation DEFAULT_ORIENTATION = FetchOrientation.FETCH_FIRST; + public static final long DEFAULT_MAX_ROWS = 1000; + + private final String operationName; + private final LogFile logFile; + + public OperationLog(String name, File file) throws FileNotFoundException{ + operationName = name; + logFile = new LogFile(file); + } + + /** + * Singleton OperationLog object per thread. + */ + private static final ThreadLocal THREAD_LOCAL_OPERATION_LOG = new + ThreadLocal() { + @Override + protected synchronized OperationLog initialValue() { + return null; + } + }; + + public static void setCurrentOperationLog(OperationLog operationLog) { + THREAD_LOCAL_OPERATION_LOG.set(operationLog); + } + + public static OperationLog getCurrentOperationLog() { + return THREAD_LOCAL_OPERATION_LOG.get(); + } + + public static void removeCurrentOperationLog() { + THREAD_LOCAL_OPERATION_LOG.remove(); + } + + /** + * Write operation execution logs into log file + * @param operationLogMessage one line of log emitted from log4j + */ + public void writeOperationLog(String operationLogMessage) { + logFile.write(operationLogMessage); + } + + /** + * Read operation execution logs from log file + * @param fetchOrientation one of Enum FetchOrientation values + * @param maxRows the max number of fetched lines from log + * @return + * @throws HiveSQLException + */ + public List readOperationLog(FetchOrientation fetchOrientation, long maxRows) + throws HiveSQLException{ + return logFile.read(fetchOrientation, maxRows); + } + + /** + * Close this OperationLog when operation is closed. The log file will be removed. + * @throws HiveSQLException + */ + public void close() throws HiveSQLException { + logFile.remove(); + } + + /** + * Wrapper for read/write the operation log file + */ + private class LogFile { + private File file; + private BufferedReader in; + private PrintStream out; + + LogFile(File file) throws FileNotFoundException { + this.file = file; + in = new BufferedReader(new InputStreamReader(new FileInputStream(file))); + out = new PrintStream(new FileOutputStream(file)); + } + + synchronized void write(String msg) { + // write log to the file + out.print(msg); + } + + synchronized List read(FetchOrientation fetchOrientation, long maxRows) + throws HiveSQLException{ + // reset the BufferReader, if fetching from the beginning of the file + if (fetchOrientation.equals(FetchOrientation.FETCH_FIRST)) { + resetIn(); + } + + return readResults(maxRows); + } + + void remove() throws HiveSQLException { + try { + FileUtils.forceDelete(file); + } catch (Exception e) { + throw new HiveSQLException("Failed to remove corresponding log file of operation: " + + operationName, e); + } + } + + private void resetIn() { + if (in != null) { + IOUtils.cleanup(LOG, in); + in = null; + } + } + + private List readResults(long nLines) throws HiveSQLException { + if (in == null) { + try { + in = new BufferedReader(new InputStreamReader(new FileInputStream(file))); + } catch (FileNotFoundException e) { + throw new HiveSQLException("Operation Log file " + file.getAbsolutePath() + " is not " + + "found.", e); + } + } + + List logs = new ArrayList(); + String line = ""; + // if nLines <= 0, read all lines in log file. + for (int i = 0; i < nLines || nLines <= 0; i++) { + try { + line = in.readLine(); + if (line == null) { + break; + } else { + logs.add(line); + } + } catch (IOException e) { + throw new HiveSQLException("Reading operation log file encountered an exception: ", e); + } + } + return logs; + } + } +} diff --git service/src/java/org/apache/hive/service/cli/operation/OperationManager.java service/src/java/org/apache/hive/service/cli/operation/OperationManager.java index 21c33bc..dd5f3f3 100644 --- service/src/java/org/apache/hive/service/cli/operation/OperationManager.java +++ service/src/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -18,6 +18,7 @@ package org.apache.hive.service.cli.operation; +import java.util.Enumeration; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -25,22 +26,19 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hive.service.AbstractService; -import org.apache.hive.service.cli.FetchOrientation; -import org.apache.hive.service.cli.HiveSQLException; -import org.apache.hive.service.cli.OperationHandle; -import org.apache.hive.service.cli.OperationState; -import org.apache.hive.service.cli.OperationStatus; -import org.apache.hive.service.cli.RowSet; -import org.apache.hive.service.cli.TableSchema; +import org.apache.hive.service.cli.*; import org.apache.hive.service.cli.session.HiveSession; +import org.apache.log4j.*; /** * OperationManager. * */ public class OperationManager extends AbstractService { - + private static final String DEFAULT_LAYOUT_PATTERN = "%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n"; private final Log LOG = LogFactory.getLog(OperationManager.class.getName()); private HiveConf hiveConf; @@ -54,7 +52,11 @@ public OperationManager() { @Override public synchronized void init(HiveConf hiveConf) { this.hiveConf = hiveConf; - + if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) { + initOperationLogCapture(); + } else { + LOG.debug("Operation level logging is turned off"); + } super.init(hiveConf); } @@ -70,6 +72,30 @@ public synchronized void stop() { super.stop(); } + private void initOperationLogCapture() { + // There should be a ConsoleAppender. Copy its Layout. + Logger root = Logger.getRootLogger(); + Layout layout = null; + + Enumeration appenders = root.getAllAppenders(); + while (appenders.hasMoreElements()) { + Appender ap = (Appender) appenders.nextElement(); + if (ap.getClass().equals(ConsoleAppender.class)) { + layout = ap.getLayout(); + break; + } + } + + if (layout == null) { + layout = new PatternLayout(DEFAULT_LAYOUT_PATTERN); + LOG.info("Cannot find a Layout from a ConsoleAppender. Using default Layout pattern."); + } + + // Register another Appender (with the same layout) that talks to us. + Appender ap = new LogDivertAppender(layout, this); + root.addAppender(ap); + } + public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession, String statement, Map confOverlay, boolean runAsync) throws HiveSQLException { @@ -191,4 +217,45 @@ public RowSet getOperationNextRowSet(OperationHandle opHandle, throws HiveSQLException { return getOperation(opHandle).getNextRowSet(orientation, maxRows); } + + public RowSet getOperationLogRowSet(OperationHandle opHandle) + throws HiveSQLException { + return getOperationLogRowSet(opHandle, OperationLog.DEFAULT_ORIENTATION, + OperationLog.DEFAULT_MAX_ROWS); + } + + public RowSet getOperationLogRowSet(OperationHandle opHandle, + FetchOrientation orientation, long maxRows) + throws HiveSQLException { + // get the OperationLog object from the operation + OperationLog operationLog = getOperation(opHandle).getOperationLog(); + if (operationLog == null) { + throw new HiveSQLException("Couldn't find log associated with operation handle: " + opHandle); + } + + // read logs + List logs = operationLog.readOperationLog(orientation, maxRows); + + // convert logs to RowSet + TableSchema tableSchema = new TableSchema(getSchema()); + RowSet rowSet = RowSetFactory.create(tableSchema, getOperation(opHandle).getProtocolVersion()); + for (String log : logs) { + rowSet.addRow(new String[] {log}); + } + + return rowSet; + } + + private Schema getSchema() { + Schema schema = new Schema(); + FieldSchema fieldSchema = new FieldSchema(); + fieldSchema.setName("operation_log"); + fieldSchema.setType("string"); + schema.addToFieldSchemas(fieldSchema); + return schema; + } + + public OperationLog getOperationLogByThread() { + return OperationLog.getCurrentOperationLog(); + } } diff --git service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index de54ca1..c27b4c3 100644 --- service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -134,7 +134,7 @@ public void prepare(HiveConf sqlOperationConf) throws HiveSQLException { } } - private void runInternal(HiveConf sqlOperationConf) throws HiveSQLException { + private void runQuery(HiveConf sqlOperationConf) throws HiveSQLException { try { // In Hive server mode, we are not able to retry in the FetchTask // case, when calling fetch queries since execute() has returned. @@ -164,12 +164,12 @@ private void runInternal(HiveConf sqlOperationConf) throws HiveSQLException { } @Override - public void run() throws HiveSQLException { + public void runInternal() throws HiveSQLException { setState(OperationState.PENDING); final HiveConf opConfig = getConfigForOperation(); prepare(opConfig); if (!shouldRunAsync()) { - runInternal(opConfig); + runQuery(opConfig); } else { final SessionState parentSessionState = SessionState.get(); // current Hive object needs to be set in aysnc thread in case of remote metastore. @@ -193,11 +193,15 @@ public Object run() throws HiveSQLException { // User information is part of the metastore client member in Hive Hive.set(sessionHive); SessionState.setCurrentSessionState(parentSessionState); + // Set current OperationLog in this async thread for keeping on saving query log. + registerCurrentOperationLog(); try { - runInternal(opConfig); + runQuery(opConfig); } catch (HiveSQLException e) { setOperationException(e); LOG.error("Error running hive query: ", e); + } finally { + unregisterOperationLog(); } return null; } @@ -239,6 +243,16 @@ private Hive getCurrentHive() throws HiveSQLException { } } + private void registerCurrentOperationLog() throws HiveSQLException { + if (isOperationLogEnabled) { + if (operationLog == null) { + throw new HiveSQLException("Failed to get current OperationLog object of Operation: " + + getHandle().getHandleIdentifier()); + } + OperationLog.setCurrentOperationLog(operationLog); + } + } + private void cleanup(OperationState state) throws HiveSQLException { setState(state); if (shouldRunAsync()) { @@ -267,6 +281,7 @@ public void cancel() throws HiveSQLException { @Override public void close() throws HiveSQLException { cleanup(OperationState.CLOSED); + cleanupOperationLog(); } @Override diff --git service/src/java/org/apache/hive/service/cli/session/HiveSession.java service/src/java/org/apache/hive/service/cli/session/HiveSession.java index 9785e95..b067931 100644 --- service/src/java/org/apache/hive/service/cli/session/HiveSession.java +++ service/src/java/org/apache/hive/service/cli/session/HiveSession.java @@ -23,13 +23,7 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hive.service.auth.HiveAuthFactory; -import org.apache.hive.service.cli.FetchOrientation; -import org.apache.hive.service.cli.GetInfoType; -import org.apache.hive.service.cli.GetInfoValue; -import org.apache.hive.service.cli.HiveSQLException; -import org.apache.hive.service.cli.OperationHandle; -import org.apache.hive.service.cli.RowSet; -import org.apache.hive.service.cli.TableSchema; +import org.apache.hive.service.cli.*; public interface HiveSession extends HiveSessionBase { @@ -144,10 +138,10 @@ public OperationHandle getFunctions(String catalogName, String schemaName, public TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException; - public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows) - throws HiveSQLException; + public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, + long maxRows, FetchType fetchType) throws HiveSQLException; - public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException; + public RowSet fetchResults(OperationHandle opHandle, FetchType fetchType) throws HiveSQLException; public String getDelegationToken(HiveAuthFactory authFactory, String owner, String renewer) throws HiveSQLException; diff --git service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java index 4c3164e..e2ef4bb 100644 --- service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java +++ service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java @@ -24,6 +24,7 @@ import org.apache.hive.service.cli.operation.OperationManager; import org.apache.hive.service.cli.thrift.TProtocolVersion; +import java.io.File; import java.util.Map; /** @@ -38,40 +39,57 @@ * Set the session manager for the session * @param sessionManager */ - public void setSessionManager(SessionManager sessionManager); + void setSessionManager(SessionManager sessionManager); /** * Get the session manager for the session */ - public SessionManager getSessionManager(); + SessionManager getSessionManager(); /** * Set operation manager for the session * @param operationManager */ - public void setOperationManager(OperationManager operationManager); + void setOperationManager(OperationManager operationManager); /** * Initialize the session * @param sessionConfMap */ - public void initialize(Map sessionConfMap); + void initialize(Map sessionConfMap); - public SessionHandle getSessionHandle(); + /** + * Check whether operation logging is enabled and session dir is created successfully + */ + boolean isOperationLogEnabled(); + + /** + * Get the session dir, which is the parent dir of operation logs + * @return a file representing the parent directory of operation logs + */ + File getOperationLogSessionDir(); + + /** + * Set the session dir, which is the parent dir of operation logs + * @param operationLogRootDir the parent dir of the session dir + */ + void setOperationLogSessionDir(File operationLogRootDir); + + SessionHandle getSessionHandle(); - public String getUsername(); + String getUsername(); - public String getPassword(); + String getPassword(); - public HiveConf getHiveConf(); + HiveConf getHiveConf(); - public SessionState getSessionState(); + SessionState getSessionState(); - public String getUserName(); + String getUserName(); - public void setUserName(String userName); + void setUserName(String userName); - public String getIpAddress(); + String getIpAddress(); - public void setIpAddress(String ipAddress); + void setIpAddress(String ipAddress); } diff --git service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index b39d64d..539897b 100644 --- service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Set; +import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.common.cli.HiveFileProcessor; @@ -44,14 +45,7 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.common.util.HiveVersionInfo; import org.apache.hive.service.auth.HiveAuthFactory; -import org.apache.hive.service.cli.FetchOrientation; -import org.apache.hive.service.cli.GetInfoType; -import org.apache.hive.service.cli.GetInfoValue; -import org.apache.hive.service.cli.HiveSQLException; -import org.apache.hive.service.cli.OperationHandle; -import org.apache.hive.service.cli.RowSet; -import org.apache.hive.service.cli.SessionHandle; -import org.apache.hive.service.cli.TableSchema; +import org.apache.hive.service.cli.*; import org.apache.hive.service.cli.operation.ExecuteStatementOperation; import org.apache.hive.service.cli.operation.GetCatalogsOperation; import org.apache.hive.service.cli.operation.GetColumnsOperation; @@ -86,6 +80,8 @@ private OperationManager operationManager; private IMetaStoreClient metastoreClient = null; private final Set opHandleSet = new HashSet(); + private boolean isOperationLogEnabled; + private File sessionLogDir; public HiveSessionImpl(TProtocolVersion protocol, String username, String password, HiveConf serverhiveConf, String ipAddress) { @@ -182,6 +178,34 @@ private void configureSession(Map sessionConfMap) { } @Override + public void setOperationLogSessionDir(File operationLogRootDir) { + sessionLogDir = new File(operationLogRootDir, sessionHandle.getHandleIdentifier().toString()); + isOperationLogEnabled = true; + + if (!sessionLogDir.exists()) { + if (!sessionLogDir.mkdir()) { + LOG.warn("Unable to create operation log session directory: " + + sessionLogDir.getAbsolutePath()); + isOperationLogEnabled = false; + } + } + + if (isOperationLogEnabled) { + LOG.info("Operation log session directory is created: " + sessionLogDir.getAbsolutePath()); + } + } + + @Override + public boolean isOperationLogEnabled() { + return isOperationLogEnabled; + } + + @Override + public File getOperationLogSessionDir() { + return sessionLogDir; + } + + @Override public TProtocolVersion getProtocolVersion() { return sessionHandle.getProtocolVersion(); } @@ -480,6 +504,9 @@ public void close() throws HiveSQLException { operationManager.closeOperation(opHandle); } opHandleSet.clear(); + // Cleanup session log directory. + cleanupSessionLogDir(); + HiveHistory hiveHist = sessionState.getHiveHistory(); if (null != hiveHist) { hiveHist.closeStream(); @@ -492,6 +519,16 @@ public void close() throws HiveSQLException { } } + private void cleanupSessionLogDir() throws HiveSQLException { + if (isOperationLogEnabled) { + try { + FileUtils.forceDelete(sessionLogDir); + } catch (Exception e) { + throw new HiveSQLException("Failed to cleanup session log dir: " + sessionHandle, e); + } + } + } + @Override public SessionState getSessionState() { return sessionState; @@ -539,22 +576,32 @@ public TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQL } @Override - public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows) - throws HiveSQLException { + public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, + long maxRows, FetchType fetchType) throws HiveSQLException { acquire(); try { - return sessionManager.getOperationManager() - .getOperationNextRowSet(opHandle, orientation, maxRows); + if (fetchType == FetchType.QUERY_OUTPUT) { + return sessionManager.getOperationManager() + .getOperationNextRowSet(opHandle, orientation, maxRows); + } else { + return sessionManager.getOperationManager() + .getOperationLogRowSet(opHandle, orientation, maxRows); + } } finally { release(); } } @Override - public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException { + public RowSet fetchResults(OperationHandle opHandle, FetchType fetchType) + throws HiveSQLException { acquire(); try { - return sessionManager.getOperationManager().getOperationNextRowSet(opHandle); + if (fetchType == FetchType.QUERY_OUTPUT) { + return sessionManager.getOperationManager().getOperationNextRowSet(opHandle); + } else { + return sessionManager.getOperationManager().getOperationLogRowSet(opHandle); + } } finally { release(); } diff --git service/src/java/org/apache/hive/service/cli/session/SessionManager.java service/src/java/org/apache/hive/service/cli/session/SessionManager.java index 816bea4..895661a 100644 --- service/src/java/org/apache/hive/service/cli/session/SessionManager.java +++ service/src/java/org/apache/hive/service/cli/session/SessionManager.java @@ -18,6 +18,8 @@ package org.apache.hive.service.cli.session; +import java.io.File; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -26,6 +28,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -53,6 +56,8 @@ new ConcurrentHashMap(); private final OperationManager operationManager = new OperationManager(); private ThreadPoolExecutor backgroundOperationPool; + private boolean isOperationLogEnabled; + private File operationLogRootDir; public SessionManager() { super("SessionManager"); @@ -80,6 +85,11 @@ public synchronized void init(HiveConf hiveConf) { keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue(backgroundPoolQueueSize)); backgroundOperationPool.allowCoreThreadTimeOut(true); addService(operationManager); + + //Create operation log root directory, if operation logging is enabled + if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) { + initOperationLogRootDir(); + } super.init(hiveConf); } @@ -92,6 +102,36 @@ private void applyAuthorizationConfigPolicy(HiveConf newHiveConf) throws HiveExc ss.applyAuthorizationPolicy(); } + private void initOperationLogRootDir() { + operationLogRootDir = new File( + hiveConf.getVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION)); + isOperationLogEnabled = true; + + if (operationLogRootDir.exists() && !operationLogRootDir.isDirectory()) { + LOG.warn("The operation log root directory exists, but it is not a directory: " + + operationLogRootDir.getAbsolutePath()); + isOperationLogEnabled = false; + } + + if (!operationLogRootDir.exists()) { + if (!operationLogRootDir.mkdirs()) { + LOG.warn("Unable to create operation log root directory: " + + operationLogRootDir.getAbsolutePath()); + isOperationLogEnabled = false; + } + } + + if (isOperationLogEnabled) { + LOG.info("Operation log root directory is created: " + operationLogRootDir.getAbsolutePath()); + try { + FileUtils.forceDeleteOnExit(operationLogRootDir); + } catch (IOException e) { + LOG.warn("Failed to cleanup HS2 logging root dir: " + + operationLogRootDir.getAbsolutePath()); + } + } + } + @Override public synchronized void start() { super.start(); @@ -110,6 +150,17 @@ public synchronized void stop() { " seconds has been exceeded. RUNNING background operations will be shut down", e); } } + cleanupLoggingRootDir(); + } + + private void cleanupLoggingRootDir() { + if (isOperationLogEnabled) { + try { + FileUtils.forceDelete(operationLogRootDir); + } catch (Exception e) { + LOG.warn("Failed to cleanup root dir of HS2 logging."); + } + } } public SessionHandle openSession(TProtocolVersion protocol, String username, String password, @@ -133,6 +184,9 @@ public SessionHandle openSession(TProtocolVersion protocol, String username, Str session.setSessionManager(this); session.setOperationManager(operationManager); session.initialize(sessionConf); + if (isOperationLogEnabled) { + session.setOperationLogSessionDir(operationLogRootDir); + } session.open(); handleToSession.put(session.getSessionHandle(), session); diff --git service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 5c87bcb..b549bc3 100644 --- service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -32,16 +32,7 @@ import org.apache.hive.service.AbstractService; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.auth.TSetIpAddressProcessor; -import org.apache.hive.service.cli.CLIService; -import org.apache.hive.service.cli.FetchOrientation; -import org.apache.hive.service.cli.GetInfoType; -import org.apache.hive.service.cli.GetInfoValue; -import org.apache.hive.service.cli.HiveSQLException; -import org.apache.hive.service.cli.OperationHandle; -import org.apache.hive.service.cli.OperationStatus; -import org.apache.hive.service.cli.RowSet; -import org.apache.hive.service.cli.SessionHandle; -import org.apache.hive.service.cli.TableSchema; +import org.apache.hive.service.cli.*; import org.apache.hive.service.cli.session.SessionManager; import org.apache.thrift.TException; import org.apache.thrift.server.TServer; @@ -530,7 +521,8 @@ public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException { RowSet rowSet = cliService.fetchResults( new OperationHandle(req.getOperationHandle()), FetchOrientation.getFetchOrientation(req.getOrientation()), - req.getMaxRows()); + req.getMaxRows(), + FetchType.getFetchType(req.getFetchType())); resp.setResults(rowSet.toTRowSet()); resp.setHasMoreRows(false); resp.setStatus(OK_STATUS); diff --git service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java index e3384d3..1af4539 100644 --- service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java +++ service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java @@ -22,18 +22,7 @@ import java.util.Map; import org.apache.hive.service.auth.HiveAuthFactory; -import org.apache.hive.service.cli.CLIServiceClient; -import org.apache.hive.service.cli.FetchOrientation; -import org.apache.hive.service.cli.GetInfoType; -import org.apache.hive.service.cli.GetInfoValue; -import org.apache.hive.service.cli.HiveSQLException; -import org.apache.hive.service.cli.OperationHandle; -import org.apache.hive.service.cli.OperationState; -import org.apache.hive.service.cli.OperationStatus; -import org.apache.hive.service.cli.RowSet; -import org.apache.hive.service.cli.RowSetFactory; -import org.apache.hive.service.cli.SessionHandle; -import org.apache.hive.service.cli.TableSchema; +import org.apache.hive.service.cli.*; import org.apache.thrift.TException; /** @@ -377,17 +366,15 @@ public TableSchema getResultSetMetadata(OperationHandle opHandle) } } - /* (non-Javadoc) - * @see org.apache.hive.service.cli.ICLIService#fetchResults(org.apache.hive.service.cli.OperationHandle, org.apache.hive.service.cli.FetchOrientation, long) - */ @Override - public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows) - throws HiveSQLException { + public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows, + FetchType fetchType) throws HiveSQLException { try { TFetchResultsReq req = new TFetchResultsReq(); req.setOperationHandle(opHandle.toTOperationHandle()); req.setOrientation(orientation.toTFetchOrientation()); req.setMaxRows(maxRows); + req.setFetchType(fetchType.toTFetchType()); TFetchResultsResp resp = cliService.FetchResults(req); checkStatus(resp.getStatus()); return RowSetFactory.create(resp.getResults(), opHandle.getProtocolVersion()); @@ -404,7 +391,7 @@ public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientatio @Override public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException { // TODO: set the correct default fetch size - return fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 10000); + return fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 10000, FetchType.QUERY_OUTPUT); } @Override diff --git service/src/test/org/apache/hive/service/cli/operation/TestOperationLoggingAPI.java service/src/test/org/apache/hive/service/cli/operation/TestOperationLoggingAPI.java new file mode 100644 index 0000000..102a74a --- /dev/null +++ service/src/test/org/apache/hive/service/cli/operation/TestOperationLoggingAPI.java @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.service.cli.operation; + +import org.junit.Assert; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hive.service.cli.*; +import org.apache.hive.service.cli.thrift.EmbeddedThriftBinaryCLIService; +import org.apache.hive.service.cli.thrift.ThriftCLIServiceClient; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; + +/** + * TestOperationLoggingAPI + * Test the FetchResults of TFetchType.LOG in thrift level. + */ +public class TestOperationLoggingAPI { + private HiveConf hiveConf = new HiveConf(); + private String tableName = "testOperationLoggingAPI_table"; + private File dataFile; + private ThriftCLIServiceClient client; + private SessionHandle sessionHandle; + private String sql = "select * from " + tableName; + private String[] expectedLogs = { + "Parsing command", + "Parse Completed", + "Starting Semantic Analysis", + "Semantic Analysis Completed", + "Starting command" + }; + + /** + * Start embedded mode, open a session, and create a table for cases usage + * @throws Exception + */ + @Before + public void setUp() throws Exception { + dataFile = new File(hiveConf.get("test.data.files"), "kv1.txt"); + EmbeddedThriftBinaryCLIService service = new EmbeddedThriftBinaryCLIService(); + service.init(hiveConf); + client = new ThriftCLIServiceClient(service); + sessionHandle = setupSession(); + } + + @After + public void tearDown() throws Exception { + // Cleanup + String queryString = "DROP TABLE " + tableName; + client.executeStatement(sessionHandle, queryString, null); + + client.closeSession(sessionHandle); + } + + @Test + public void testFetchResultsOfLog() throws Exception { + // verify whether the sql operation log is generated and fetch correctly. + OperationHandle operationHandle = client.executeStatement(sessionHandle, sql, null); + RowSet rowSetLog = client.fetchResults(operationHandle, FetchType.LOG); + verifyFetchedLog(rowSetLog); + } + + @Test + public void testFetchResultsOfLogAsync() throws Exception { + // verify whether the sql operation log is generated and fetch correctly in async mode. + OperationHandle operationHandle = client.executeStatementAsync(sessionHandle, sql, null); + + // Poll on the operation status till the query is completed + boolean isQueryRunning = true; + long pollTimeout = System.currentTimeMillis() + 100000; + OperationStatus opStatus; + OperationState state = null; + + while (isQueryRunning) { + // Break if polling times out + if (System.currentTimeMillis() > pollTimeout) { + break; + } + opStatus = client.getOperationStatus(operationHandle); + Assert.assertNotNull(opStatus); + state = opStatus.getState(); + + if (state == OperationState.CANCELED || + state == OperationState.CLOSED || + state == OperationState.FINISHED || + state == OperationState.ERROR) { + isQueryRunning = false; + } + Thread.sleep(1000); + } + Assert.assertEquals("Query should be finished", OperationState.FINISHED, state); + + // The sql should be completed now. + RowSet rowSet = client.fetchResults(operationHandle, FetchType.LOG); + verifyFetchedLog(rowSet); + } + + @Test + public void testFetchResultsOfLogWithOrientation() throws Exception { + // (FETCH_FIRST) execute a sql, and fetch its sql operation log as expected value + OperationHandle operationHandle = client.executeStatement(sessionHandle, sql, null); + RowSet rowSetLog = client.fetchResults(operationHandle, FetchOrientation.FETCH_FIRST, 1000, + FetchType.LOG); + int expectedLogLength = rowSetLog.numRows(); + + // (FETCH_NEXT) execute the same sql again, + // and fetch the sql operation log with FETCH_NEXT orientation + OperationHandle operationHandleWithOrientation = client.executeStatement(sessionHandle, sql, + null); + RowSet rowSetLogWithOrientation; + int logLength = 0; + int maxRows = calculateProperMaxRows(expectedLogLength); + do { + rowSetLogWithOrientation = client.fetchResults(operationHandleWithOrientation, + FetchOrientation.FETCH_NEXT, maxRows, FetchType.LOG); + logLength += rowSetLogWithOrientation.numRows(); + } while (rowSetLogWithOrientation.numRows() == maxRows); + Assert.assertEquals(expectedLogLength, logLength); + + // (FETCH_FIRST) fetch again from the same operation handle with FETCH_FIRST orientation + rowSetLogWithOrientation = client.fetchResults(operationHandleWithOrientation, + FetchOrientation.FETCH_FIRST, 1000, FetchType.LOG); + verifyFetchedLog(rowSetLogWithOrientation); + } + + @Test + public void testFetchResultsOfLogCleanup() throws Exception { + // Verify cleanup functionality. + // Open a new session, since this case needs to close the session in the end. + SessionHandle sessionHandleCleanup = setupSession(); + + // prepare + OperationHandle operationHandle = client.executeStatement(sessionHandleCleanup, sql, null); + RowSet rowSetLog = client.fetchResults(operationHandle, FetchType.LOG); + verifyFetchedLog(rowSetLog); + + File sessionLogDir = new File( + hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION) + + File.separator + sessionHandleCleanup.getHandleIdentifier()); + File operationLogFile = new File(sessionLogDir, operationHandle.getHandleIdentifier().toString()); + + // check whether exception is thrown when fetching log from a closed operation. + client.closeOperation(operationHandle); + try { + client.fetchResults(operationHandle, FetchType.LOG); + Assert.fail("Fetch should fail"); + } catch (HiveSQLException e) { + // expected + } + + // check whether operation log file is deleted. + if (operationLogFile.exists()) { + Assert.fail("Operation log file should be deleted."); + } + + // check whether session log dir is deleted after session is closed. + client.closeSession(sessionHandleCleanup); + if (sessionLogDir.exists()) { + Assert.fail("Session log dir should be deleted."); + } + } + + private SessionHandle setupSession() throws Exception { + // Open a session + SessionHandle sessionHandle = client.openSession(null, null, null); + + // Change lock manager to embedded mode + String queryString = "SET hive.lock.manager=" + + "org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager"; + client.executeStatement(sessionHandle, queryString, null); + + // Drop the table if it exists + queryString = "DROP TABLE IF EXISTS " + tableName; + client.executeStatement(sessionHandle, queryString, null); + + // Create a test table + queryString = "create table " + tableName + " (key int, value string)"; + client.executeStatement(sessionHandle, queryString, null); + + // Load data + queryString = "load data local inpath '" + dataFile + "' into table " + tableName; + client.executeStatement(sessionHandle, queryString, null); + + // Precondition check: verify whether the table is created and data is fetched correctly. + OperationHandle operationHandle = client.executeStatement(sessionHandle, sql, null); + RowSet rowSetResult = client.fetchResults(operationHandle); + Assert.assertEquals(500, rowSetResult.numRows()); + Assert.assertEquals(238, rowSetResult.iterator().next()[0]); + Assert.assertEquals("val_238", rowSetResult.iterator().next()[1]); + + return sessionHandle; + } + + // Since the log length of the sql operation may vary during HIVE dev, calculate a proper maxRows. + private int calculateProperMaxRows(int len) { + if (len < 10) { + return 1; + } else if (len < 100) { + return 10; + } else { + return 100; + } + } + + private void verifyFetchedLog(RowSet rowSet) { + StringBuilder stringBuilder = new StringBuilder(); + + for (Object[] row : rowSet) { + stringBuilder.append(row[0]); + } + + String logs = stringBuilder.toString(); + for (String log : expectedLogs) { + Assert.assertTrue(logs.contains(log)); + } + } +}