commit 87be20fb0c9f83148926d99063cfb4bdf7697c03 Author: Bharath Krishna Date: Mon Mar 26 14:15:10 2018 -0700 HIVE-14388 : Add number of rows inserted message after insert command in Beeline diff --git jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index 06542cee02e5dc4696f2621bb45cc4f24c67dfda..7c822485902f7fe002181757d4e1b1e6b45d0f53 100644 --- jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -708,8 +708,8 @@ public int getUpdateCount() throws SQLException { * client might end up using executeAsync and then call this to check if the query run is * finished. */ - waitForOperationToComplete(); - return -1; + long numRows = waitForResultSetStatus().getNumRows(); + return (int)numRows; } /* diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 75f928b69d3d7b206564216d24be450848a1fe8a..91313ac32c9e175e71ed0f56b254f1f59eb5233d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -2205,11 +2205,14 @@ private void execute() throws CommandProcessorResponse { Map stats = SessionState.get().getMapRedStats(); if (stats != null && !stats.isEmpty()) { long totalCpu = 0; + long numRows = 0; console.printInfo("MapReduce Jobs Launched: "); for (Map.Entry entry : stats.entrySet()) { console.printInfo("Stage-" + entry.getKey() + ": " + entry.getValue()); totalCpu += entry.getValue().getCpuMSec(); + numRows += entry.getValue().getNumRows(); } + queryState.setNumRows(numRows); console.printInfo("Total MapReduce CPU Time Spent: " + Utilities.formatMsecToStr(totalCpu)); } lDrvState.stateLock.lock(); diff --git ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java index cf9c2273159c0d779ea90ad029613678fb0967a6..86ed9336dc2e82004aceb57851dbf1e7b6881667 100644 --- ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java +++ ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java @@ -38,6 +38,16 @@ String jobId; + public long getNumRows() { + return numRows; + } + + public void setNumRows(long numRows) { + this.numRows = numRows; + } + + private long numRows; + public MapRedStats(int numMap, int numReduce, long cpuMSec, boolean ifSuccess, String jobId) { this.numMap = numMap; this.numReduce = numReduce; diff --git ql/src/java/org/apache/hadoop/hive/ql/QueryState.java ql/src/java/org/apache/hadoop/hive/ql/QueryState.java index 706c9ffa48b9c3b4a6fdaae78bab1d39c3d0efda..ff48a67a7e1cb65045be9663182738c77eaa5df5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/QueryState.java +++ ql/src/java/org/apache/hadoop/hive/ql/QueryState.java @@ -49,6 +49,7 @@ */ private HiveTxnManager txnManager; + private long numRows; /** * Private constructor, use QueryState.Builder instead. * @param conf The query specific configuration object @@ -100,6 +101,13 @@ public void setTxnManager(HiveTxnManager txnManager) { this.txnManager = txnManager; } + public long getNumRows() { + return numRows; + } + + public void setNumRows(long numRows) { + this.numRows = numRows; + } /** * Builder to instantiate the QueryState object. */ diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index c084fa054cb771bfdb033d244935713e3c7eb874..67ae43e410bac37477d9ab2cc8861cd4604d3d2a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.SkewedColumnPositionPair; import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; @@ -587,6 +588,9 @@ protected void initializeOp(Configuration hconf) throws HiveException { logEveryNRows = HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVE_LOG_N_RECORDS); statsMap.put(getCounterName(Counter.RECORDS_OUT), row_count); + if(conf.getTableInfo() != null && conf.getTableInfo().getTableName() != null) { + statsMap.put("TOTAL_TABLE_ROWS_AFFECTED_FS",row_count); + } } catch (HiveException e) { throw e; } catch (Exception e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index c28ef99621e67a5b16bf02a1112df2ec993c4f79..e2415d500cf88575c5f2409577dc9a23271e7030 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.lib.Node; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java index eb3a11a8815e35dee825edb7d3246c8ecef6b0a7..60131f99bf2c7f20c360f06863af2c048c4cc7f3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java @@ -423,6 +423,8 @@ private MapRedStats progress(ExecDriverTaskHandle th) throws IOException, LockEx // update based on the final value of the counters updateCounters(ctrs, rj); + long numRows =mapRedStats.getCounters().findCounter("HIVE","TOTAL_TABLE_ROWS_AFFECTED_FS").getValue(); + mapRedStats.setNumRows(numRows); SessionState ss = SessionState.get(); if (ss != null) { diff --git service-rpc/if/TCLIService.thrift service-rpc/if/TCLIService.thrift index 30f8af7f3e6e0598b410498782900ac27971aef0..5f1961b4e52fe3e7a89c868e0d36d48b031660e4 100644 --- service-rpc/if/TCLIService.thrift +++ service-rpc/if/TCLIService.thrift @@ -1070,6 +1070,7 @@ struct TGetOperationStatusResp { 10: optional TProgressUpdateResp progressUpdateResponse + 11: optional i64 numRows } diff --git service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp index b2b62c71492b844f4439367364c5c81aa62f3908..fcf4bee8d1f6bc12b594be5fb8764509a034912c 100644 --- service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp +++ service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp @@ -8571,6 +8571,11 @@ void TGetOperationStatusResp::__set_progressUpdateResponse(const TProgressUpdate __isset.progressUpdateResponse = true; } +void TGetOperationStatusResp::__set_numRows(const int64_t val) { + this->numRows = val; +__isset.numRows = true; +} + uint32_t TGetOperationStatusResp::read(::apache::thrift::protocol::TProtocol* iprot) { apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); @@ -8675,6 +8680,14 @@ uint32_t TGetOperationStatusResp::read(::apache::thrift::protocol::TProtocol* ip xfer += iprot->skip(ftype); } break; + case 11: + if (ftype == ::apache::thrift::protocol::T_I64) { + xfer += iprot->readI64(this->numRows); + this->__isset.numRows = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -8743,6 +8756,11 @@ uint32_t TGetOperationStatusResp::write(::apache::thrift::protocol::TProtocol* o xfer += this->progressUpdateResponse.write(oprot); xfer += oprot->writeFieldEnd(); } + if (this->__isset.numRows) { + xfer += oprot->writeFieldBegin("numRows", ::apache::thrift::protocol::T_I64, 11); + xfer += oprot->writeI64(this->numRows); + xfer += oprot->writeFieldEnd(); + } xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -8760,6 +8778,7 @@ void swap(TGetOperationStatusResp &a, TGetOperationStatusResp &b) { swap(a.operationCompleted, b.operationCompleted); swap(a.hasResultSet, b.hasResultSet); swap(a.progressUpdateResponse, b.progressUpdateResponse); + swap(a.numRows, b.numRows); swap(a.__isset, b.__isset); } @@ -8774,6 +8793,7 @@ TGetOperationStatusResp::TGetOperationStatusResp(const TGetOperationStatusResp& operationCompleted = other283.operationCompleted; hasResultSet = other283.hasResultSet; progressUpdateResponse = other283.progressUpdateResponse; + numRows = other283.numRows; __isset = other283.__isset; } TGetOperationStatusResp& TGetOperationStatusResp::operator=(const TGetOperationStatusResp& other284) { @@ -8787,6 +8807,7 @@ TGetOperationStatusResp& TGetOperationStatusResp::operator=(const TGetOperationS operationCompleted = other284.operationCompleted; hasResultSet = other284.hasResultSet; progressUpdateResponse = other284.progressUpdateResponse; + numRows = other284.numRows; __isset = other284.__isset; return *this; } @@ -8803,6 +8824,7 @@ void TGetOperationStatusResp::printTo(std::ostream& out) const { out << ", " << "operationCompleted="; (__isset.operationCompleted ? (out << to_string(operationCompleted)) : (out << "")); out << ", " << "hasResultSet="; (__isset.hasResultSet ? (out << to_string(hasResultSet)) : (out << "")); out << ", " << "progressUpdateResponse="; (__isset.progressUpdateResponse ? (out << to_string(progressUpdateResponse)) : (out << "")); + out << ", " << "numRows="; (__isset.numRows ? (out << to_string(numRows)) : (out << "")); out << ")"; } diff --git service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h index 4321ad6d3c966d30f7a69552f91804cf2f1ba6c4..d52b6b51420f60c0b98e5126ab3de741b0b14697 100644 --- service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h +++ service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h @@ -3840,7 +3840,7 @@ inline std::ostream& operator<<(std::ostream& out, const TGetOperationStatusReq& } typedef struct _TGetOperationStatusResp__isset { - _TGetOperationStatusResp__isset() : operationState(false), sqlState(false), errorCode(false), errorMessage(false), taskStatus(false), operationStarted(false), operationCompleted(false), hasResultSet(false), progressUpdateResponse(false) {} + _TGetOperationStatusResp__isset() : operationState(false), sqlState(false), errorCode(false), errorMessage(false), taskStatus(false), operationStarted(false), operationCompleted(false), hasResultSet(false), progressUpdateResponse(false), numRows(false) {} bool operationState :1; bool sqlState :1; bool errorCode :1; @@ -3850,6 +3850,7 @@ typedef struct _TGetOperationStatusResp__isset { bool operationCompleted :1; bool hasResultSet :1; bool progressUpdateResponse :1; + bool numRows :1; } _TGetOperationStatusResp__isset; class TGetOperationStatusResp { @@ -3857,7 +3858,7 @@ class TGetOperationStatusResp { TGetOperationStatusResp(const TGetOperationStatusResp&); TGetOperationStatusResp& operator=(const TGetOperationStatusResp&); - TGetOperationStatusResp() : operationState((TOperationState::type)0), sqlState(), errorCode(0), errorMessage(), taskStatus(), operationStarted(0), operationCompleted(0), hasResultSet(0) { + TGetOperationStatusResp() : operationState((TOperationState::type)0), sqlState(), errorCode(0), errorMessage(), taskStatus(), operationStarted(0), operationCompleted(0), hasResultSet(0), numRows(0) { } virtual ~TGetOperationStatusResp() throw(); @@ -3871,6 +3872,7 @@ class TGetOperationStatusResp { int64_t operationCompleted; bool hasResultSet; TProgressUpdateResp progressUpdateResponse; + int64_t numRows; _TGetOperationStatusResp__isset __isset; @@ -3894,6 +3896,8 @@ class TGetOperationStatusResp { void __set_progressUpdateResponse(const TProgressUpdateResp& val); + void __set_numRows(const int64_t val); + bool operator == (const TGetOperationStatusResp & rhs) const { if (!(status == rhs.status)) @@ -3934,6 +3938,10 @@ class TGetOperationStatusResp { return false; else if (__isset.progressUpdateResponse && !(progressUpdateResponse == rhs.progressUpdateResponse)) return false; + if (__isset.numRows != rhs.__isset.numRows) + return false; + else if (__isset.numRows && !(numRows == rhs.numRows)) + return false; return true; } bool operator != (const TGetOperationStatusResp &rhs) const { diff --git service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusResp.java service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusResp.java index 15e8220eb3eb12b72c7b64029410dced33bc0d72..4035f503662a8a57af6b2e2957eed8864ebb07fd 100644 --- service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusResp.java +++ service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusResp.java @@ -48,6 +48,7 @@ private static final org.apache.thrift.protocol.TField OPERATION_COMPLETED_FIELD_DESC = new org.apache.thrift.protocol.TField("operationCompleted", org.apache.thrift.protocol.TType.I64, (short)8); private static final org.apache.thrift.protocol.TField HAS_RESULT_SET_FIELD_DESC = new org.apache.thrift.protocol.TField("hasResultSet", org.apache.thrift.protocol.TType.BOOL, (short)9); private static final org.apache.thrift.protocol.TField PROGRESS_UPDATE_RESPONSE_FIELD_DESC = new org.apache.thrift.protocol.TField("progressUpdateResponse", org.apache.thrift.protocol.TType.STRUCT, (short)10); + private static final org.apache.thrift.protocol.TField NUM_ROWS_FIELD_DESC = new org.apache.thrift.protocol.TField("numRows", org.apache.thrift.protocol.TType.I64, (short)11); private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); static { @@ -65,6 +66,7 @@ private long operationCompleted; // optional private boolean hasResultSet; // optional private TProgressUpdateResp progressUpdateResponse; // optional + private long numRows; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -81,7 +83,8 @@ OPERATION_STARTED((short)7, "operationStarted"), OPERATION_COMPLETED((short)8, "operationCompleted"), HAS_RESULT_SET((short)9, "hasResultSet"), - PROGRESS_UPDATE_RESPONSE((short)10, "progressUpdateResponse"); + PROGRESS_UPDATE_RESPONSE((short)10, "progressUpdateResponse"), + NUM_ROWS((short)11, "numRows"); private static final Map byName = new HashMap(); @@ -116,6 +119,8 @@ public static _Fields findByThriftId(int fieldId) { return HAS_RESULT_SET; case 10: // PROGRESS_UPDATE_RESPONSE return PROGRESS_UPDATE_RESPONSE; + case 11: // NUM_ROWS + return NUM_ROWS; default: return null; } @@ -160,8 +165,9 @@ public String getFieldName() { private static final int __OPERATIONSTARTED_ISSET_ID = 1; private static final int __OPERATIONCOMPLETED_ISSET_ID = 2; private static final int __HASRESULTSET_ISSET_ID = 3; + private static final int __NUMROWS_ISSET_ID = 4; private byte __isset_bitfield = 0; - private static final _Fields optionals[] = {_Fields.OPERATION_STATE,_Fields.SQL_STATE,_Fields.ERROR_CODE,_Fields.ERROR_MESSAGE,_Fields.TASK_STATUS,_Fields.OPERATION_STARTED,_Fields.OPERATION_COMPLETED,_Fields.HAS_RESULT_SET,_Fields.PROGRESS_UPDATE_RESPONSE}; + private static final _Fields optionals[] = {_Fields.OPERATION_STATE,_Fields.SQL_STATE,_Fields.ERROR_CODE,_Fields.ERROR_MESSAGE,_Fields.TASK_STATUS,_Fields.OPERATION_STARTED,_Fields.OPERATION_COMPLETED,_Fields.HAS_RESULT_SET,_Fields.PROGRESS_UPDATE_RESPONSE,_Fields.NUM_ROWS}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -185,6 +191,8 @@ public String getFieldName() { new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); tmpMap.put(_Fields.PROGRESS_UPDATE_RESPONSE, new org.apache.thrift.meta_data.FieldMetaData("progressUpdateResponse", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRUCT , "TProgressUpdateResp"))); + tmpMap.put(_Fields.NUM_ROWS, new org.apache.thrift.meta_data.FieldMetaData("numRows", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TGetOperationStatusResp.class, metaDataMap); } @@ -226,6 +234,7 @@ public TGetOperationStatusResp(TGetOperationStatusResp other) { if (other.isSetProgressUpdateResponse()) { this.progressUpdateResponse = other.progressUpdateResponse; } + this.numRows = other.numRows; } public TGetOperationStatusResp deepCopy() { @@ -248,6 +257,8 @@ public void clear() { setHasResultSetIsSet(false); this.hasResultSet = false; this.progressUpdateResponse = null; + setNumRowsIsSet(false); + this.numRows = 0; } public TStatus getStatus() { @@ -484,6 +495,28 @@ public void setProgressUpdateResponseIsSet(boolean value) { } } + public long getNumRows() { + return this.numRows; + } + + public void setNumRows(long numRows) { + this.numRows = numRows; + setNumRowsIsSet(true); + } + + public void unsetNumRows() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __NUMROWS_ISSET_ID); + } + + /** Returns true if field numRows is set (has been assigned a value) and false otherwise */ + public boolean isSetNumRows() { + return EncodingUtils.testBit(__isset_bitfield, __NUMROWS_ISSET_ID); + } + + public void setNumRowsIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __NUMROWS_ISSET_ID, value); + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case STATUS: @@ -566,6 +599,14 @@ public void setFieldValue(_Fields field, Object value) { } break; + case NUM_ROWS: + if (value == null) { + unsetNumRows(); + } else { + setNumRows((Long)value); + } + break; + } } @@ -601,6 +642,9 @@ public Object getFieldValue(_Fields field) { case PROGRESS_UPDATE_RESPONSE: return getProgressUpdateResponse(); + case NUM_ROWS: + return getNumRows(); + } throw new IllegalStateException(); } @@ -632,6 +676,8 @@ public boolean isSet(_Fields field) { return isSetHasResultSet(); case PROGRESS_UPDATE_RESPONSE: return isSetProgressUpdateResponse(); + case NUM_ROWS: + return isSetNumRows(); } throw new IllegalStateException(); } @@ -739,6 +785,15 @@ public boolean equals(TGetOperationStatusResp that) { return false; } + boolean this_present_numRows = true && this.isSetNumRows(); + boolean that_present_numRows = true && that.isSetNumRows(); + if (this_present_numRows || that_present_numRows) { + if (!(this_present_numRows && that_present_numRows)) + return false; + if (this.numRows != that.numRows) + return false; + } + return true; } @@ -796,6 +851,11 @@ public int hashCode() { if (present_progressUpdateResponse) list.add(progressUpdateResponse); + boolean present_numRows = true && (isSetNumRows()); + list.add(present_numRows); + if (present_numRows) + list.add(numRows); + return list.hashCode(); } @@ -907,6 +967,16 @@ public int compareTo(TGetOperationStatusResp other) { return lastComparison; } } + lastComparison = Boolean.valueOf(isSetNumRows()).compareTo(other.isSetNumRows()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetNumRows()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.numRows, other.numRows); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -1008,6 +1078,12 @@ public String toString() { } first = false; } + if (isSetNumRows()) { + if (!first) sb.append(", "); + sb.append("numRows:"); + sb.append(this.numRows); + first = false; + } sb.append(")"); return sb.toString(); } @@ -1142,6 +1218,14 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, TGetOperationStatus org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 11: // NUM_ROWS + if (schemeField.type == org.apache.thrift.protocol.TType.I64) { + struct.numRows = iprot.readI64(); + struct.setNumRowsIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -1215,6 +1299,11 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, TGetOperationStatu oprot.writeFieldEnd(); } } + if (struct.isSetNumRows()) { + oprot.writeFieldBegin(NUM_ROWS_FIELD_DESC); + oprot.writeI64(struct.numRows); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -1261,7 +1350,10 @@ public void write(org.apache.thrift.protocol.TProtocol prot, TGetOperationStatus if (struct.isSetProgressUpdateResponse()) { optionals.set(8); } - oprot.writeBitSet(optionals, 9); + if (struct.isSetNumRows()) { + optionals.set(9); + } + oprot.writeBitSet(optionals, 10); if (struct.isSetOperationState()) { oprot.writeI32(struct.operationState.getValue()); } @@ -1289,6 +1381,9 @@ public void write(org.apache.thrift.protocol.TProtocol prot, TGetOperationStatus if (struct.isSetProgressUpdateResponse()) { struct.progressUpdateResponse.write(oprot); } + if (struct.isSetNumRows()) { + oprot.writeI64(struct.numRows); + } } @Override @@ -1297,7 +1392,7 @@ public void read(org.apache.thrift.protocol.TProtocol prot, TGetOperationStatusR struct.status = new TStatus(); struct.status.read(iprot); struct.setStatusIsSet(true); - BitSet incoming = iprot.readBitSet(9); + BitSet incoming = iprot.readBitSet(10); if (incoming.get(0)) { struct.operationState = org.apache.hive.service.rpc.thrift.TOperationState.findByValue(iprot.readI32()); struct.setOperationStateIsSet(true); @@ -1335,6 +1430,10 @@ public void read(org.apache.thrift.protocol.TProtocol prot, TGetOperationStatusR struct.progressUpdateResponse.read(iprot); struct.setProgressUpdateResponseIsSet(true); } + if (incoming.get(9)) { + struct.numRows = iprot.readI64(); + struct.setNumRowsIsSet(true); + } } } diff --git service-rpc/src/gen/thrift/gen-php/Types.php service-rpc/src/gen/thrift/gen-php/Types.php index abb7c1ff3a2c8b72dc97689758266b675880e32b..72fa40b71ecb2abe7dd53ad68e50376da2e08cb7 100644 --- service-rpc/src/gen/thrift/gen-php/Types.php +++ service-rpc/src/gen/thrift/gen-php/Types.php @@ -8351,6 +8351,10 @@ class TGetOperationStatusResp { * @var \TProgressUpdateResp */ public $progressUpdateResponse = null; + /** + * @var int + */ + public $numRows = null; public function __construct($vals=null) { if (!isset(self::$_TSPEC)) { @@ -8397,6 +8401,10 @@ class TGetOperationStatusResp { 'type' => TType::STRUCT, 'class' => '\TProgressUpdateResp', ), + 11 => array( + 'var' => 'numRows', + 'type' => TType::I64, + ), ); } if (is_array($vals)) { @@ -8430,6 +8438,9 @@ class TGetOperationStatusResp { if (isset($vals['progressUpdateResponse'])) { $this->progressUpdateResponse = $vals['progressUpdateResponse']; } + if (isset($vals['numRows'])) { + $this->numRows = $vals['numRows']; + } } } @@ -8524,6 +8535,13 @@ class TGetOperationStatusResp { $xfer += $input->skip($ftype); } break; + case 11: + if ($ftype == TType::I64) { + $xfer += $input->readI64($this->numRows); + } else { + $xfer += $input->skip($ftype); + } + break; default: $xfer += $input->skip($ftype); break; @@ -8593,6 +8611,11 @@ class TGetOperationStatusResp { $xfer += $this->progressUpdateResponse->write($output); $xfer += $output->writeFieldEnd(); } + if ($this->numRows !== null) { + $xfer += $output->writeFieldBegin('numRows', TType::I64, 11); + $xfer += $output->writeI64($this->numRows); + $xfer += $output->writeFieldEnd(); + } $xfer += $output->writeFieldStop(); $xfer += $output->writeStructEnd(); return $xfer; diff --git service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py index 0f8fd0745be0f4ed9e96b7bbe0f092d03649bcdf..cf60ebcad87c5469ead25f5aec288cec58685420 100644 --- service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py +++ service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py @@ -6303,6 +6303,7 @@ class TGetOperationStatusResp: - operationCompleted - hasResultSet - progressUpdateResponse + - numRows """ thrift_spec = ( @@ -6317,9 +6318,10 @@ class TGetOperationStatusResp: (8, TType.I64, 'operationCompleted', None, None, ), # 8 (9, TType.BOOL, 'hasResultSet', None, None, ), # 9 (10, TType.STRUCT, 'progressUpdateResponse', (TProgressUpdateResp, TProgressUpdateResp.thrift_spec), None, ), # 10 + (11, TType.I64, 'numRows', None, None, ), # 11 ) - def __init__(self, status=None, operationState=None, sqlState=None, errorCode=None, errorMessage=None, taskStatus=None, operationStarted=None, operationCompleted=None, hasResultSet=None, progressUpdateResponse=None,): + def __init__(self, status=None, operationState=None, sqlState=None, errorCode=None, errorMessage=None, taskStatus=None, operationStarted=None, operationCompleted=None, hasResultSet=None, progressUpdateResponse=None, numRows=None,): self.status = status self.operationState = operationState self.sqlState = sqlState @@ -6330,6 +6332,7 @@ def __init__(self, status=None, operationState=None, sqlState=None, errorCode=No self.operationCompleted = operationCompleted self.hasResultSet = hasResultSet self.progressUpdateResponse = progressUpdateResponse + self.numRows = numRows def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -6392,6 +6395,11 @@ def read(self, iprot): self.progressUpdateResponse.read(iprot) else: iprot.skip(ftype) + elif fid == 11: + if ftype == TType.I64: + self.numRows = iprot.readI64() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -6442,6 +6450,10 @@ def write(self, oprot): oprot.writeFieldBegin('progressUpdateResponse', TType.STRUCT, 10) self.progressUpdateResponse.write(oprot) oprot.writeFieldEnd() + if self.numRows is not None: + oprot.writeFieldBegin('numRows', TType.I64, 11) + oprot.writeI64(self.numRows) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -6463,6 +6475,7 @@ def __hash__(self): value = (value * 31) ^ hash(self.operationCompleted) value = (value * 31) ^ hash(self.hasResultSet) value = (value * 31) ^ hash(self.progressUpdateResponse) + value = (value * 31) ^ hash(self.numRows) return value def __repr__(self): diff --git service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb index 60183dae9e9927bd09a9676e49eeb4aea2401737..25aee9ebb16383f1aa7817a49ee3cb1a18aa8623 100644 --- service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb +++ service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb @@ -1624,6 +1624,7 @@ class TGetOperationStatusResp OPERATIONCOMPLETED = 8 HASRESULTSET = 9 PROGRESSUPDATERESPONSE = 10 + NUMROWS = 11 FIELDS = { STATUS => {:type => ::Thrift::Types::STRUCT, :name => 'status', :class => ::TStatus}, @@ -1635,7 +1636,8 @@ class TGetOperationStatusResp OPERATIONSTARTED => {:type => ::Thrift::Types::I64, :name => 'operationStarted', :optional => true}, OPERATIONCOMPLETED => {:type => ::Thrift::Types::I64, :name => 'operationCompleted', :optional => true}, HASRESULTSET => {:type => ::Thrift::Types::BOOL, :name => 'hasResultSet', :optional => true}, - PROGRESSUPDATERESPONSE => {:type => ::Thrift::Types::STRUCT, :name => 'progressUpdateResponse', :class => ::TProgressUpdateResp, :optional => true} + PROGRESSUPDATERESPONSE => {:type => ::Thrift::Types::STRUCT, :name => 'progressUpdateResponse', :class => ::TProgressUpdateResp, :optional => true}, + NUMROWS => {:type => ::Thrift::Types::I64, :name => 'numRows', :optional => true} } def struct_fields; FIELDS; end diff --git service/src/java/org/apache/hive/service/cli/CLIService.java service/src/java/org/apache/hive/service/cli/CLIService.java index c9914ba9bf8653cbcbca7d6612e98a64058c0fcc..07d3f63213b4dde945107f31ee05914e35fea12a 100644 --- service/src/java/org/apache/hive/service/cli/CLIService.java +++ service/src/java/org/apache/hive/service/cli/CLIService.java @@ -473,6 +473,8 @@ public OperationStatus getOperationStatus(OperationHandle opHandle, boolean getP } OperationStatus opStatus = operation.getStatus(); LOG.debug(opHandle + ": getOperationStatus()"); + long numRows = operation.getQueryState().getNumRows(); + opStatus.setNumRows(numRows); opStatus.setJobProgressUpdate(progressUpdateLog(getProgressUpdate, operation, conf)); return opStatus; } diff --git service/src/java/org/apache/hive/service/cli/OperationStatus.java service/src/java/org/apache/hive/service/cli/OperationStatus.java index 52cc3ae4f26b990b3e4edb52d9de85b3cc25f269..bb614a5d6f9c7b2d2510ac8f2007eb7bb9ff73e5 100644 --- service/src/java/org/apache/hive/service/cli/OperationStatus.java +++ service/src/java/org/apache/hive/service/cli/OperationStatus.java @@ -32,6 +32,16 @@ private final HiveSQLException operationException; private JobProgressUpdate jobProgressUpdate; + public long getNumRows() { + return numRows; + } + + public void setNumRows(long numRows) { + this.numRows = numRows; + } + + private long numRows; + public OperationStatus(OperationState state, String taskStatus, long operationStarted, long operationCompleted, boolean hasResultSet, HiveSQLException operationException) { this.state = state; this.taskStatus = taskStatus; diff --git service/src/java/org/apache/hive/service/cli/operation/Operation.java service/src/java/org/apache/hive/service/cli/operation/Operation.java index 3706c72abc77ac8bd77947cc1c5d084ddf965e9f..41033e6d3a0360a75c1dfa2e235559522eef6927 100644 --- service/src/java/org/apache/hive/service/cli/operation/Operation.java +++ service/src/java/org/apache/hive/service/cli/operation/Operation.java @@ -183,7 +183,9 @@ public long getOperationTimeout() { public void setOperationTimeout(long operationTimeout) { this.operationTimeout = operationTimeout; } - + public QueryState getQueryState() { + return queryState; + } protected void setOperationException(HiveSQLException operationException) { this.operationException = operationException; } diff --git service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index c64c99120ad21ee98af81ec6659a2722e3e1d1c7..b15555bc7a5f627aea68e4284d082efa5d4daf98 100644 --- service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument; +import org.apache.hive.service.cli.*; import org.apache.hive.service.rpc.thrift.TSetClientInfoReq; import org.apache.hive.service.rpc.thrift.TSetClientInfoResp; @@ -43,21 +44,6 @@ import org.apache.hive.service.auth.HiveAuthConstants; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.auth.TSetIpAddressProcessor; -import org.apache.hive.service.cli.CLIService; -import org.apache.hive.service.cli.FetchOrientation; -import org.apache.hive.service.cli.FetchType; -import org.apache.hive.service.cli.GetInfoType; -import org.apache.hive.service.cli.GetInfoValue; -import org.apache.hive.service.cli.HiveSQLException; -import org.apache.hive.service.cli.JobProgressUpdate; -import org.apache.hive.service.cli.OperationHandle; -import org.apache.hive.service.cli.OperationStatus; -import org.apache.hive.service.cli.OperationType; -import org.apache.hive.service.cli.ProgressMonitorStatusMapper; -import org.apache.hive.service.cli.RowSet; -import org.apache.hive.service.cli.SessionHandle; -import org.apache.hive.service.cli.TableSchema; -import org.apache.hive.service.cli.TezProgressMonitorStatusMapper; import org.apache.hive.service.cli.operation.Operation; import org.apache.hive.service.cli.session.SessionManager; import org.apache.hive.service.rpc.thrift.TCLIService; @@ -691,6 +677,11 @@ public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req) th try { OperationStatus operationStatus = cliService.getOperationStatus(operationHandle, req.isGetProgressUpdate()); + + if(operationStatus.getState().equals(OperationState.FINISHED)) { + long numRows = operationStatus.getNumRows(); + resp.setNumRows(numRows); + } resp.setOperationState(operationStatus.getState().toTOperationState()); resp.setErrorMessage(operationStatus.getState().getErrorMessage()); HiveSQLException opException = operationStatus.getOperationException();