diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index db942b0..841886e 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2095,6 +2095,11 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { HIVE_SERVER2_THRIFT_CLIENT_PASSWORD("hive.server2.thrift.client.password", "anonymous","Password to use against " + "thrift client"), + // webhdfs bypass setting for fetching data + HIVE_SERVER2_WEBHDFS_BYPASS_ENABLED("hive.server2.webhdfs.bypass.enabled", false, + "Setting this property to true will have client download data by webhdfs if it is possible." + ), + HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,list,delete,reload,compile", "Comma separated list of non-SQL Hive commands users are authorized to execute"), HIVE_CONF_RESTRICTED_LIST("hive.conf.restricted.list", diff --git jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java index 245c6a3..559ae76 100644 --- jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java +++ jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java @@ -20,6 +20,10 @@ import static org.apache.hive.service.cli.thrift.TCLIServiceConstants.TYPE_NAMES; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.io.UnsupportedEncodingException; +import java.net.URI; import java.sql.Connection; import java.sql.ResultSet; import java.sql.ResultSetMetaData; @@ -28,11 +32,27 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Properties; import java.util.concurrent.locks.ReentrantLock; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hive.service.cli.ColumnDescriptor; import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.RowSetFactory; import org.apache.hive.service.cli.TableSchema; @@ -76,6 +96,8 @@ private boolean emptyResultSet = false; private boolean isScrollable = false; private boolean fetchFirst = false; + private String finalDirUri; + private AbstractSerDe serde = null; private final TProtocolVersion protocol; @@ -101,6 +123,7 @@ private boolean emptyResultSet = false; private boolean isScrollable = false; private ReentrantLock transportLock = null; + private String finalDirUri; public Builder(Statement statement) throws SQLException { this.statement = statement; @@ -174,6 +197,11 @@ public Builder setTransportLock(ReentrantLock transportLock) { return this; } + public Builder setFinalDirUri(String finalDirUri) { + this.finalDirUri = finalDirUri; + return this; + } + public HiveQueryResultSet build() throws SQLException { return new HiveQueryResultSet(this); } @@ -189,6 +217,7 @@ protected HiveQueryResultSet(Builder builder) throws SQLException { this.stmtHandle = builder.stmtHandle; this.sessHandle = builder.sessHandle; this.fetchSize = builder.fetchSize; + this.finalDirUri = builder.finalDirUri; columnNames = new ArrayList(); normalizedColumnNames = new ArrayList(); columnTypes = new ArrayList(); @@ -357,15 +386,38 @@ public boolean next() throws SQLException { fetchFirst = false; } if (fetchedRows == null || !fetchedRowsItr.hasNext()) { - TFetchResultsReq fetchReq = new TFetchResultsReq(stmtHandle, - orientation, fetchSize); - TFetchResultsResp fetchResp; - fetchResp = client.FetchResults(fetchReq); - Utils.verifySuccessWithInfo(fetchResp.getStatus()); - - TRowSet results = fetchResp.getResults(); - fetchedRows = RowSetFactory.create(results, protocol); - fetchedRowsItr = fetchedRows.iterator(); + if (finalDirUri != null) { + Configuration conf = new Configuration(); + FSDataInputStream in; + List rows = new ArrayList<>(); + FileSystem fs = FileSystem.get(URI.create(finalDirUri), conf); + FileStatus[] fStatus = fs.listStatus(new Path(finalDirUri)); + for (FileStatus file : fStatus) { + if (file.isFile()) { + in = fs.open(file.getPath()); + BufferedReader br = new BufferedReader(new InputStreamReader(in)); + String line; + while ((line = br.readLine()) != null) { + rows.add(line); + } + } + } + + TableSchema tb = getSchema(); + fetchedRows = RowSetFactory.create(tb, protocol); + decodeFromString(rows, fetchedRows); + fetchedRowsItr = fetchedRows.iterator(); + } else { + TFetchResultsReq fetchReq = new TFetchResultsReq(stmtHandle, + orientation, + fetchSize); + TFetchResultsResp fetchResp; + fetchResp = client.FetchResults(fetchReq); + Utils.verifySuccessWithInfo(fetchResp.getStatus()); + TRowSet results = fetchResp.getResults(); + fetchedRows = RowSetFactory.create(results, protocol); + fetchedRowsItr = fetchedRows.iterator(); + } } String rowStr = ""; @@ -390,6 +442,75 @@ public boolean next() throws SQLException { return true; } + public RowSet decodeFromString(List rows, RowSet rowSet) + throws SQLException, SerDeException{ + getSerDe(); + StructObjectInspector soi = (StructObjectInspector) serde.getObjectInspector(); + List fieldRefs = soi.getAllStructFieldRefs(); + + Object[] deserializedFields = new Object[fieldRefs.size()]; + Object rowObj; + ObjectInspector fieldOI; + + for (Object rowString : rows) { + try { + rowObj = serde.deserialize(new BytesWritable(((String)rowString).getBytes("UTF-8"))); + } catch (UnsupportedEncodingException e) { + throw new SerDeException(e); + } + for (int i = 0; i < fieldRefs.size(); i++) { + StructField fieldRef = fieldRefs.get(i); + fieldOI = fieldRef.getFieldObjectInspector(); + Object fieldData = soi.getStructFieldData(rowObj, fieldRef); + deserializedFields[i] = SerDeUtils.toThriftPayload(fieldData, fieldOI, + protocol.getValue()); + } + rowSet.addRow(deserializedFields); + } + + return rowSet; + } + + private AbstractSerDe getSerDe() throws SQLException { + if (serde != null) { + return serde; + } + + try { + TableSchema schema = getSchema(); + List descriptors = schema.getColumnDescriptors(); + StringBuilder nameSb = new StringBuilder(); + StringBuilder typesSb = new StringBuilder(); + + if (schema != null) { + for (int pos = 0; pos < schema.getSize(); pos++) { + if (pos != 0) { + nameSb.append(","); + typesSb.append(","); + } + nameSb.append(descriptors.get(pos).getName()); + typesSb.append(descriptors.get(pos).getTypeName().toLowerCase()); + } + } + String names = nameSb.toString(); + String types = typesSb.toString(); + + serde = new LazySimpleSerDe(); + Properties props = new Properties(); + if (names.length() > 0) { + props.setProperty(serdeConstants.LIST_COLUMNS, names); + } + if (types.length() > 0) { + props.setProperty(serdeConstants.LIST_COLUMN_TYPES, types); + } + SerDeUtils.initializeSerDe(serde, new Configuration(), props, null); + } catch (Exception ex) { + ex.printStackTrace(); + throw new SQLException("Could not create ResultSet: " + ex.getMessage(), ex); + } + return serde; + } + @Override public ResultSetMetaData getMetaData() throws SQLException { if (isClosed) { diff --git jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index 180f99e8..0f70544 100644 --- jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -61,6 +61,7 @@ Map sessConf = new HashMap(); private int fetchSize = DEFAULT_FETCH_SIZE; private boolean isScrollableResultset = false; + private String finalDirUri; /** * We need to keep a reference to the result set to support the following: * @@ -246,6 +247,9 @@ public boolean execute(String sql) throws SQLException { TExecuteStatementResp execResp = client.ExecuteStatement(execReq); Utils.verifySuccessWithInfo(execResp.getStatus()); stmtHandle = execResp.getOperationHandle(); + if (execResp.isSetFinalDirUri()) { + finalDirUri = execResp.getFinalDirUri(); + } isExecuteStatementFailed = false; } catch (SQLException eS) { isExecuteStatementFailed = true; @@ -305,7 +309,7 @@ public boolean execute(String sql) throws SQLException { } resultSet = new HiveQueryResultSet.Builder(this).setClient(client).setSessionHandle(sessHandle) .setStmtHandle(stmtHandle).setMaxRows(maxRows).setFetchSize(fetchSize) - .setScrollable(isScrollableResultset) + .setScrollable(isScrollableResultset).setFinalDirUri(finalDirUri) .build(); return true; } diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 8fafd61..17bfba9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1876,6 +1876,22 @@ public boolean getResults(List res) throws IOException, CommandNeedRetryExceptio return true; } + public String getFinalDirName() { + if (useBypass()) { + return plan.getFetchTask().getTableDirName(); + } + return null; + } + + public boolean useBypass() { + if(!conf.getBoolVar(ConfVars.HIVE_SERVER2_WEBHDFS_BYPASS_ENABLED)) + return false; + int jobs = Utilities.getMRTasks(plan.getRootTasks()).size() + + Utilities.getTezTasks(plan.getRootTasks()).size() + + Utilities.getSparkTasks(plan.getRootTasks()).size(); + return jobs > 0; + } + public void resetFetch() throws IOException { if (isFetchingTable()) { try { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java index 1634143..172d388 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java @@ -111,6 +111,13 @@ public TableDesc getTblDesc() { } /** + * Return the tableDirName of the fetchWork. + */ + public String getTableDirName() { + return work.getTblDir().toString(); + } + + /** * Return the maximum number of rows returned by fetch. */ public int getMaxRows() { diff --git service/if/TCLIService.thrift service/if/TCLIService.thrift index baf583f..08826ef 100644 --- service/if/TCLIService.thrift +++ service/if/TCLIService.thrift @@ -60,6 +60,9 @@ enum TProtocolVersion { // V8 adds support for interval types HIVE_CLI_SERVICE_PROTOCOL_V8 + + // V9 adds a optional uri of result data in ExecuteStatementResp + HIVE_CLI_SERVICE_PROTOCOL_V9 } enum TTypeId { @@ -700,6 +703,10 @@ struct TExecuteStatementReq { struct TExecuteStatementResp { 1: required TStatus status 2: optional TOperationHandle operationHandle + + // When clients try to download data by WebHDFS and if it is possible, + // finalDirUri has the HDFS path. If that is null, clients use FetchResults. + 3: optional string finalDirUri } // GetTypeInfo() diff --git service/src/gen/thrift/gen-cpp/TCLIService_types.cpp service/src/gen/thrift/gen-cpp/TCLIService_types.cpp index b852379..52b91bb 100644 --- service/src/gen/thrift/gen-cpp/TCLIService_types.cpp +++ service/src/gen/thrift/gen-cpp/TCLIService_types.cpp @@ -21,7 +21,8 @@ int _kTProtocolVersionValues[] = { TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V5, TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V6, TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V7, - TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V8 + TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V8, + TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V9 }; const char* _kTProtocolVersionNames[] = { "HIVE_CLI_SERVICE_PROTOCOL_V1", @@ -31,9 +32,10 @@ const char* _kTProtocolVersionNames[] = { "HIVE_CLI_SERVICE_PROTOCOL_V5", "HIVE_CLI_SERVICE_PROTOCOL_V6", "HIVE_CLI_SERVICE_PROTOCOL_V7", - "HIVE_CLI_SERVICE_PROTOCOL_V8" + "HIVE_CLI_SERVICE_PROTOCOL_V8", + "HIVE_CLI_SERVICE_PROTOCOL_V9" }; -const std::map _TProtocolVersion_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(8, _kTProtocolVersionValues, _kTProtocolVersionNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); +const std::map _TProtocolVersion_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(9, _kTProtocolVersionValues, _kTProtocolVersionNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); int _kTTypeIdValues[] = { TTypeId::BOOLEAN_TYPE, @@ -5702,6 +5704,11 @@ void TExecuteStatementResp::__set_operationHandle(const TOperationHandle& val) { __isset.operationHandle = true; } +void TExecuteStatementResp::__set_finalDirUri(const std::string& val) { + this->finalDirUri = val; +__isset.finalDirUri = true; +} + uint32_t TExecuteStatementResp::read(::apache::thrift::protocol::TProtocol* iprot) { apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); @@ -5740,6 +5747,14 @@ uint32_t TExecuteStatementResp::read(::apache::thrift::protocol::TProtocol* ipro xfer += iprot->skip(ftype); } break; + case 3: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->finalDirUri); + this->__isset.finalDirUri = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -5768,6 +5783,11 @@ uint32_t TExecuteStatementResp::write(::apache::thrift::protocol::TProtocol* opr xfer += this->operationHandle.write(oprot); xfer += oprot->writeFieldEnd(); } + if (this->__isset.finalDirUri) { + xfer += oprot->writeFieldBegin("finalDirUri", ::apache::thrift::protocol::T_STRING, 3); + xfer += oprot->writeString(this->finalDirUri); + xfer += oprot->writeFieldEnd(); + } xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -5777,17 +5797,20 @@ void swap(TExecuteStatementResp &a, TExecuteStatementResp &b) { using ::std::swap; swap(a.status, b.status); swap(a.operationHandle, b.operationHandle); + swap(a.finalDirUri, b.finalDirUri); swap(a.__isset, b.__isset); } TExecuteStatementResp::TExecuteStatementResp(const TExecuteStatementResp& other224) { status = other224.status; operationHandle = other224.operationHandle; + finalDirUri = other224.finalDirUri; __isset = other224.__isset; } TExecuteStatementResp& TExecuteStatementResp::operator=(const TExecuteStatementResp& other225) { status = other225.status; operationHandle = other225.operationHandle; + finalDirUri = other225.finalDirUri; __isset = other225.__isset; return *this; } @@ -5796,6 +5819,7 @@ void TExecuteStatementResp::printTo(std::ostream& out) const { out << "TExecuteStatementResp("; out << "status=" << to_string(status); out << ", " << "operationHandle="; (__isset.operationHandle ? (out << to_string(operationHandle)) : (out << "")); + out << ", " << "finalDirUri="; (__isset.finalDirUri ? (out << to_string(finalDirUri)) : (out << "")); out << ")"; } diff --git service/src/gen/thrift/gen-cpp/TCLIService_types.h service/src/gen/thrift/gen-cpp/TCLIService_types.h index b078c99..91b138c 100644 --- service/src/gen/thrift/gen-cpp/TCLIService_types.h +++ service/src/gen/thrift/gen-cpp/TCLIService_types.h @@ -28,7 +28,8 @@ struct TProtocolVersion { HIVE_CLI_SERVICE_PROTOCOL_V5 = 4, HIVE_CLI_SERVICE_PROTOCOL_V6 = 5, HIVE_CLI_SERVICE_PROTOCOL_V7 = 6, - HIVE_CLI_SERVICE_PROTOCOL_V8 = 7 + HIVE_CLI_SERVICE_PROTOCOL_V8 = 7, + HIVE_CLI_SERVICE_PROTOCOL_V9 = 8 }; }; @@ -2543,8 +2544,9 @@ inline std::ostream& operator<<(std::ostream& out, const TExecuteStatementReq& o } typedef struct _TExecuteStatementResp__isset { - _TExecuteStatementResp__isset() : operationHandle(false) {} + _TExecuteStatementResp__isset() : operationHandle(false), finalDirUri(false) {} bool operationHandle :1; + bool finalDirUri :1; } _TExecuteStatementResp__isset; class TExecuteStatementResp { @@ -2552,12 +2554,13 @@ class TExecuteStatementResp { TExecuteStatementResp(const TExecuteStatementResp&); TExecuteStatementResp& operator=(const TExecuteStatementResp&); - TExecuteStatementResp() { + TExecuteStatementResp() : finalDirUri() { } virtual ~TExecuteStatementResp() throw(); TStatus status; TOperationHandle operationHandle; + std::string finalDirUri; _TExecuteStatementResp__isset __isset; @@ -2565,6 +2568,8 @@ class TExecuteStatementResp { void __set_operationHandle(const TOperationHandle& val); + void __set_finalDirUri(const std::string& val); + bool operator == (const TExecuteStatementResp & rhs) const { if (!(status == rhs.status)) @@ -2573,6 +2578,10 @@ class TExecuteStatementResp { return false; else if (__isset.operationHandle && !(operationHandle == rhs.operationHandle)) return false; + if (__isset.finalDirUri != rhs.__isset.finalDirUri) + return false; + else if (__isset.finalDirUri && !(finalDirUri == rhs.finalDirUri)) + return false; return true; } bool operator != (const TExecuteStatementResp &rhs) const { diff --git service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TExecuteStatementResp.java service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TExecuteStatementResp.java index 0b9aa0f..94dea2e 100644 --- service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TExecuteStatementResp.java +++ service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TExecuteStatementResp.java @@ -40,6 +40,7 @@ private static final org.apache.thrift.protocol.TField STATUS_FIELD_DESC = new org.apache.thrift.protocol.TField("status", org.apache.thrift.protocol.TType.STRUCT, (short)1); private static final org.apache.thrift.protocol.TField OPERATION_HANDLE_FIELD_DESC = new org.apache.thrift.protocol.TField("operationHandle", org.apache.thrift.protocol.TType.STRUCT, (short)2); + private static final org.apache.thrift.protocol.TField FINAL_DIR_URI_FIELD_DESC = new org.apache.thrift.protocol.TField("finalDirUri", org.apache.thrift.protocol.TType.STRING, (short)3); private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); static { @@ -49,11 +50,13 @@ private TStatus status; // required private TOperationHandle operationHandle; // optional + private String finalDirUri; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { STATUS((short)1, "status"), - OPERATION_HANDLE((short)2, "operationHandle"); + OPERATION_HANDLE((short)2, "operationHandle"), + FINAL_DIR_URI((short)3, "finalDirUri"); private static final Map byName = new HashMap(); @@ -72,6 +75,8 @@ public static _Fields findByThriftId(int fieldId) { return STATUS; case 2: // OPERATION_HANDLE return OPERATION_HANDLE; + case 3: // FINAL_DIR_URI + return FINAL_DIR_URI; default: return null; } @@ -112,7 +117,7 @@ public String getFieldName() { } // isset id assignments - private static final _Fields optionals[] = {_Fields.OPERATION_HANDLE}; + private static final _Fields optionals[] = {_Fields.OPERATION_HANDLE,_Fields.FINAL_DIR_URI}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -120,6 +125,8 @@ public String getFieldName() { new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, TStatus.class))); tmpMap.put(_Fields.OPERATION_HANDLE, new org.apache.thrift.meta_data.FieldMetaData("operationHandle", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, TOperationHandle.class))); + tmpMap.put(_Fields.FINAL_DIR_URI, new org.apache.thrift.meta_data.FieldMetaData("finalDirUri", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TExecuteStatementResp.class, metaDataMap); } @@ -144,6 +151,9 @@ public TExecuteStatementResp(TExecuteStatementResp other) { if (other.isSetOperationHandle()) { this.operationHandle = new TOperationHandle(other.operationHandle); } + if (other.isSetFinalDirUri()) { + this.finalDirUri = other.finalDirUri; + } } public TExecuteStatementResp deepCopy() { @@ -154,6 +164,7 @@ public TExecuteStatementResp deepCopy() { public void clear() { this.status = null; this.operationHandle = null; + this.finalDirUri = null; } public TStatus getStatus() { @@ -202,6 +213,29 @@ public void setOperationHandleIsSet(boolean value) { } } + public String getFinalDirUri() { + return this.finalDirUri; + } + + public void setFinalDirUri(String finalDirUri) { + this.finalDirUri = finalDirUri; + } + + public void unsetFinalDirUri() { + this.finalDirUri = null; + } + + /** Returns true if field finalDirUri is set (has been assigned a value) and false otherwise */ + public boolean isSetFinalDirUri() { + return this.finalDirUri != null; + } + + public void setFinalDirUriIsSet(boolean value) { + if (!value) { + this.finalDirUri = null; + } + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case STATUS: @@ -220,6 +254,14 @@ public void setFieldValue(_Fields field, Object value) { } break; + case FINAL_DIR_URI: + if (value == null) { + unsetFinalDirUri(); + } else { + setFinalDirUri((String)value); + } + break; + } } @@ -231,6 +273,9 @@ public Object getFieldValue(_Fields field) { case OPERATION_HANDLE: return getOperationHandle(); + case FINAL_DIR_URI: + return getFinalDirUri(); + } throw new IllegalStateException(); } @@ -246,6 +291,8 @@ public boolean isSet(_Fields field) { return isSetStatus(); case OPERATION_HANDLE: return isSetOperationHandle(); + case FINAL_DIR_URI: + return isSetFinalDirUri(); } throw new IllegalStateException(); } @@ -281,6 +328,15 @@ public boolean equals(TExecuteStatementResp that) { return false; } + boolean this_present_finalDirUri = true && this.isSetFinalDirUri(); + boolean that_present_finalDirUri = true && that.isSetFinalDirUri(); + if (this_present_finalDirUri || that_present_finalDirUri) { + if (!(this_present_finalDirUri && that_present_finalDirUri)) + return false; + if (!this.finalDirUri.equals(that.finalDirUri)) + return false; + } + return true; } @@ -298,6 +354,11 @@ public int hashCode() { if (present_operationHandle) list.add(operationHandle); + boolean present_finalDirUri = true && (isSetFinalDirUri()); + list.add(present_finalDirUri); + if (present_finalDirUri) + list.add(finalDirUri); + return list.hashCode(); } @@ -329,6 +390,16 @@ public int compareTo(TExecuteStatementResp other) { return lastComparison; } } + lastComparison = Boolean.valueOf(isSetFinalDirUri()).compareTo(other.isSetFinalDirUri()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetFinalDirUri()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.finalDirUri, other.finalDirUri); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -366,6 +437,16 @@ public String toString() { } first = false; } + if (isSetFinalDirUri()) { + if (!first) sb.append(", "); + sb.append("finalDirUri:"); + if (this.finalDirUri == null) { + sb.append("null"); + } else { + sb.append(this.finalDirUri); + } + first = false; + } sb.append(")"); return sb.toString(); } @@ -437,6 +518,14 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, TExecuteStatementRe org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 3: // FINAL_DIR_URI + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.finalDirUri = iprot.readString(); + struct.setFinalDirUriIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -462,6 +551,13 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, TExecuteStatementR oprot.writeFieldEnd(); } } + if (struct.finalDirUri != null) { + if (struct.isSetFinalDirUri()) { + oprot.writeFieldBegin(FINAL_DIR_URI_FIELD_DESC); + oprot.writeString(struct.finalDirUri); + oprot.writeFieldEnd(); + } + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -484,10 +580,16 @@ public void write(org.apache.thrift.protocol.TProtocol prot, TExecuteStatementRe if (struct.isSetOperationHandle()) { optionals.set(0); } - oprot.writeBitSet(optionals, 1); + if (struct.isSetFinalDirUri()) { + optionals.set(1); + } + oprot.writeBitSet(optionals, 2); if (struct.isSetOperationHandle()) { struct.operationHandle.write(oprot); } + if (struct.isSetFinalDirUri()) { + oprot.writeString(struct.finalDirUri); + } } @Override @@ -496,12 +598,16 @@ public void read(org.apache.thrift.protocol.TProtocol prot, TExecuteStatementRes struct.status = new TStatus(); struct.status.read(iprot); struct.setStatusIsSet(true); - BitSet incoming = iprot.readBitSet(1); + BitSet incoming = iprot.readBitSet(2); if (incoming.get(0)) { struct.operationHandle = new TOperationHandle(); struct.operationHandle.read(iprot); struct.setOperationHandleIsSet(true); } + if (incoming.get(1)) { + struct.finalDirUri = iprot.readString(); + struct.setFinalDirUriIsSet(true); + } } } diff --git service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TProtocolVersion.java service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TProtocolVersion.java index c936ada..d9e17a9 100644 --- service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TProtocolVersion.java +++ service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TProtocolVersion.java @@ -19,7 +19,8 @@ HIVE_CLI_SERVICE_PROTOCOL_V5(4), HIVE_CLI_SERVICE_PROTOCOL_V6(5), HIVE_CLI_SERVICE_PROTOCOL_V7(6), - HIVE_CLI_SERVICE_PROTOCOL_V8(7); + HIVE_CLI_SERVICE_PROTOCOL_V8(7), + HIVE_CLI_SERVICE_PROTOCOL_V9(8); private final int value; @@ -56,6 +57,8 @@ public static TProtocolVersion findByValue(int value) { return HIVE_CLI_SERVICE_PROTOCOL_V7; case 7: return HIVE_CLI_SERVICE_PROTOCOL_V8; + case 8: + return HIVE_CLI_SERVICE_PROTOCOL_V9; default: return null; } diff --git service/src/gen/thrift/gen-py/TCLIService/ttypes.py service/src/gen/thrift/gen-py/TCLIService/ttypes.py index ef5f5f5..74118ab 100644 --- service/src/gen/thrift/gen-py/TCLIService/ttypes.py +++ service/src/gen/thrift/gen-py/TCLIService/ttypes.py @@ -25,6 +25,7 @@ class TProtocolVersion: HIVE_CLI_SERVICE_PROTOCOL_V6 = 5 HIVE_CLI_SERVICE_PROTOCOL_V7 = 6 HIVE_CLI_SERVICE_PROTOCOL_V8 = 7 + HIVE_CLI_SERVICE_PROTOCOL_V9 = 8 _VALUES_TO_NAMES = { 0: "HIVE_CLI_SERVICE_PROTOCOL_V1", @@ -35,6 +36,7 @@ class TProtocolVersion: 5: "HIVE_CLI_SERVICE_PROTOCOL_V6", 6: "HIVE_CLI_SERVICE_PROTOCOL_V7", 7: "HIVE_CLI_SERVICE_PROTOCOL_V8", + 8: "HIVE_CLI_SERVICE_PROTOCOL_V9", } _NAMES_TO_VALUES = { @@ -46,6 +48,7 @@ class TProtocolVersion: "HIVE_CLI_SERVICE_PROTOCOL_V6": 5, "HIVE_CLI_SERVICE_PROTOCOL_V7": 6, "HIVE_CLI_SERVICE_PROTOCOL_V8": 7, + "HIVE_CLI_SERVICE_PROTOCOL_V9": 8, } class TTypeId: @@ -4253,17 +4256,20 @@ class TExecuteStatementResp: Attributes: - status - operationHandle + - finalDirUri """ thrift_spec = ( None, # 0 (1, TType.STRUCT, 'status', (TStatus, TStatus.thrift_spec), None, ), # 1 (2, TType.STRUCT, 'operationHandle', (TOperationHandle, TOperationHandle.thrift_spec), None, ), # 2 + (3, TType.STRING, 'finalDirUri', None, None, ), # 3 ) - def __init__(self, status=None, operationHandle=None,): + def __init__(self, status=None, operationHandle=None, finalDirUri=None,): self.status = status self.operationHandle = operationHandle + self.finalDirUri = finalDirUri def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -4286,6 +4292,11 @@ def read(self, iprot): self.operationHandle.read(iprot) else: iprot.skip(ftype) + elif fid == 3: + if ftype == TType.STRING: + self.finalDirUri = iprot.readString() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -4304,6 +4315,10 @@ def write(self, oprot): oprot.writeFieldBegin('operationHandle', TType.STRUCT, 2) self.operationHandle.write(oprot) oprot.writeFieldEnd() + if self.finalDirUri is not None: + oprot.writeFieldBegin('finalDirUri', TType.STRING, 3) + oprot.writeString(self.finalDirUri) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -4317,6 +4332,7 @@ def __hash__(self): value = 17 value = (value * 31) ^ hash(self.status) value = (value * 31) ^ hash(self.operationHandle) + value = (value * 31) ^ hash(self.finalDirUri) return value def __repr__(self): diff --git service/src/gen/thrift/gen-py/hive_service/ThriftHive-remote service/src/gen/thrift/gen-py/hive_service/ThriftHive-remote index e167d5b..093848c 100755 --- service/src/gen/thrift/gen-py/hive_service/ThriftHive-remote +++ service/src/gen/thrift/gen-py/hive_service/ThriftHive-remote @@ -54,6 +54,7 @@ if len(sys.argv) <= 1 or sys.argv[1] == '--help': print(' void drop_table(string dbname, string name, bool deleteData)') print(' void drop_table_with_environment_context(string dbname, string name, bool deleteData, EnvironmentContext environment_context)') print(' get_tables(string db_name, string pattern)') + print(' get_table_meta(string db_patterns, string tbl_patterns, tbl_types)') print(' get_all_tables(string db_name)') print(' Table get_table(string dbname, string tbl_name)') print(' get_table_objects_by_name(string dbname, tbl_names)') @@ -77,6 +78,7 @@ if len(sys.argv) <= 1 or sys.argv[1] == '--help': print(' DropPartitionsResult drop_partitions_req(DropPartitionsRequest req)') print(' Partition get_partition(string db_name, string tbl_name, part_vals)') print(' Partition exchange_partition( partitionSpecs, string source_db, string source_table_name, string dest_db, string dest_table_name)') + print(' exchange_partitions( partitionSpecs, string source_db, string source_table_name, string dest_db, string dest_table_name)') print(' Partition get_partition_with_auth(string db_name, string tbl_name, part_vals, string user_name, group_names)') print(' Partition get_partition_by_name(string db_name, string tbl_name, string part_name)') print(' get_partitions(string db_name, string tbl_name, i16 max_parts)') @@ -411,6 +413,12 @@ elif cmd == 'get_tables': sys.exit(1) pp.pprint(client.get_tables(args[0],args[1],)) +elif cmd == 'get_table_meta': + if len(args) != 3: + print('get_table_meta requires 3 args') + sys.exit(1) + pp.pprint(client.get_table_meta(args[0],args[1],eval(args[2]),)) + elif cmd == 'get_all_tables': if len(args) != 1: print('get_all_tables requires 1 args') @@ -549,6 +557,12 @@ elif cmd == 'exchange_partition': sys.exit(1) pp.pprint(client.exchange_partition(eval(args[0]),args[1],args[2],args[3],args[4],)) +elif cmd == 'exchange_partitions': + if len(args) != 5: + print('exchange_partitions requires 5 args') + sys.exit(1) + pp.pprint(client.exchange_partitions(eval(args[0]),args[1],args[2],args[3],args[4],)) + elif cmd == 'get_partition_with_auth': if len(args) != 5: print('get_partition_with_auth requires 5 args') diff --git service/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb service/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb index f004ec4..71b137d 100644 --- service/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb +++ service/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb @@ -15,8 +15,9 @@ module TProtocolVersion HIVE_CLI_SERVICE_PROTOCOL_V6 = 5 HIVE_CLI_SERVICE_PROTOCOL_V7 = 6 HIVE_CLI_SERVICE_PROTOCOL_V8 = 7 - VALUE_MAP = {0 => "HIVE_CLI_SERVICE_PROTOCOL_V1", 1 => "HIVE_CLI_SERVICE_PROTOCOL_V2", 2 => "HIVE_CLI_SERVICE_PROTOCOL_V3", 3 => "HIVE_CLI_SERVICE_PROTOCOL_V4", 4 => "HIVE_CLI_SERVICE_PROTOCOL_V5", 5 => "HIVE_CLI_SERVICE_PROTOCOL_V6", 6 => "HIVE_CLI_SERVICE_PROTOCOL_V7", 7 => "HIVE_CLI_SERVICE_PROTOCOL_V8"} - VALID_VALUES = Set.new([HIVE_CLI_SERVICE_PROTOCOL_V1, HIVE_CLI_SERVICE_PROTOCOL_V2, HIVE_CLI_SERVICE_PROTOCOL_V3, HIVE_CLI_SERVICE_PROTOCOL_V4, HIVE_CLI_SERVICE_PROTOCOL_V5, HIVE_CLI_SERVICE_PROTOCOL_V6, HIVE_CLI_SERVICE_PROTOCOL_V7, HIVE_CLI_SERVICE_PROTOCOL_V8]).freeze + HIVE_CLI_SERVICE_PROTOCOL_V9 = 8 + VALUE_MAP = {0 => "HIVE_CLI_SERVICE_PROTOCOL_V1", 1 => "HIVE_CLI_SERVICE_PROTOCOL_V2", 2 => "HIVE_CLI_SERVICE_PROTOCOL_V3", 3 => "HIVE_CLI_SERVICE_PROTOCOL_V4", 4 => "HIVE_CLI_SERVICE_PROTOCOL_V5", 5 => "HIVE_CLI_SERVICE_PROTOCOL_V6", 6 => "HIVE_CLI_SERVICE_PROTOCOL_V7", 7 => "HIVE_CLI_SERVICE_PROTOCOL_V8", 8 => "HIVE_CLI_SERVICE_PROTOCOL_V9"} + VALID_VALUES = Set.new([HIVE_CLI_SERVICE_PROTOCOL_V1, HIVE_CLI_SERVICE_PROTOCOL_V2, HIVE_CLI_SERVICE_PROTOCOL_V3, HIVE_CLI_SERVICE_PROTOCOL_V4, HIVE_CLI_SERVICE_PROTOCOL_V5, HIVE_CLI_SERVICE_PROTOCOL_V6, HIVE_CLI_SERVICE_PROTOCOL_V7, HIVE_CLI_SERVICE_PROTOCOL_V8, HIVE_CLI_SERVICE_PROTOCOL_V9]).freeze end module TTypeId @@ -1153,10 +1154,12 @@ class TExecuteStatementResp include ::Thrift::Struct, ::Thrift::Struct_Union STATUS = 1 OPERATIONHANDLE = 2 + FINALDIRURI = 3 FIELDS = { STATUS => {:type => ::Thrift::Types::STRUCT, :name => 'status', :class => ::TStatus}, - OPERATIONHANDLE => {:type => ::Thrift::Types::STRUCT, :name => 'operationHandle', :class => ::TOperationHandle, :optional => true} + OPERATIONHANDLE => {:type => ::Thrift::Types::STRUCT, :name => 'operationHandle', :class => ::TOperationHandle, :optional => true}, + FINALDIRURI => {:type => ::Thrift::Types::STRING, :name => 'finalDirUri', :optional => true} } def struct_fields; FIELDS; end diff --git service/src/java/org/apache/hive/service/cli/CLIService.java service/src/java/org/apache/hive/service/cli/CLIService.java index adc9809..ebe03fb 100644 --- service/src/java/org/apache/hive/service/cli/CLIService.java +++ service/src/java/org/apache/hive/service/cli/CLIService.java @@ -487,6 +487,13 @@ public void renewDelegationToken(SessionHandle sessionHandle, HiveAuthFactory au LOG.info(sessionHandle + ": renewDelegationToken()"); } + public String getFinalDirUri(OperationHandle opHandle) + throws HiveSQLException { + String uri = sessionManager.getOperationManager().getOperation(opHandle) + .getParentSession().getFinalDirUri(opHandle); + return uri; + } + public SessionManager getSessionManager() { return sessionManager; } diff --git service/src/java/org/apache/hive/service/cli/operation/Operation.java service/src/java/org/apache/hive/service/cli/operation/Operation.java index 25cefc2..e4cc105 100644 --- service/src/java/org/apache/hive/service/cli/operation/Operation.java +++ service/src/java/org/apache/hive/service/cli/operation/Operation.java @@ -335,6 +335,10 @@ public void cancel() throws HiveSQLException { public abstract RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException; + public String getFinalDirName() throws HiveSQLException { + return null; + } + public RowSet getNextRowSet() throws HiveSQLException { return getNextRowSet(FetchOrientation.FETCH_NEXT, DEFAULT_FETCH_MAX_ROWS); } diff --git service/src/java/org/apache/hive/service/cli/operation/OperationManager.java service/src/java/org/apache/hive/service/cli/operation/OperationManager.java index b0bd351..890e5fc 100644 --- service/src/java/org/apache/hive/service/cli/operation/OperationManager.java +++ service/src/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -296,6 +296,11 @@ public OperationLog getOperationLogByThread() { return OperationLog.getCurrentOperationLog(); } + public String getOperationFinalDirUri(OperationHandle opHandle) + throws HiveSQLException { + return getOperation(opHandle).getFinalDirName(); + } + public List removeExpiredOperations(OperationHandle[] handles) { List removed = new ArrayList(); for (OperationHandle handle : handles) { diff --git service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 1331a99..6a28294 100644 --- service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.exec.ExplainTask; +import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; @@ -370,6 +371,11 @@ public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws H } } + @Override + public String getFinalDirName() throws HiveSQLException { + return driver.getFinalDirName(); + } + private RowSet decode(List rows, RowSet rowSet) throws Exception { if (driver.isFetchingTable()) { return prepareFromRow(rows, rowSet); diff --git service/src/java/org/apache/hive/service/cli/session/HiveSession.java service/src/java/org/apache/hive/service/cli/session/HiveSession.java index 4f4e92d..4e8c99b 100644 --- service/src/java/org/apache/hive/service/cli/session/HiveSession.java +++ service/src/java/org/apache/hive/service/cli/session/HiveSession.java @@ -163,4 +163,7 @@ void renewDelegationToken(HiveAuthFactory authFactory, String tokenStr) void closeExpiredOperations(); long getNoOperationTime(); + + String getFinalDirUri(OperationHandle opHandle) + throws HiveSQLException; } diff --git service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index a14908b..23b85b0 100644 --- service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -817,6 +817,12 @@ public void renewDelegationToken(HiveAuthFactory authFactory, String tokenStr) authFactory.renewDelegationToken(tokenStr); } + @Override + public String getFinalDirUri(OperationHandle opHandle) + throws HiveSQLException { + return operationManager.getOperationFinalDirUri(opHandle); + } + // extract the real user from the given token string private String getUserFromToken(HiveAuthFactory authFactory, String tokenStr) throws HiveSQLException { return authFactory.getUserFromToken(tokenStr); diff --git service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 8434965..800ad90 100644 --- service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -508,6 +508,10 @@ public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws T : cliService.executeStatement(sessionHandle, statement, confOverlay); resp.setOperationHandle(operationHandle.toTOperationHandle()); resp.setStatus(OK_STATUS); + String finalDirUri = cliService.getFinalDirUri(operationHandle); + if (finalDirUri != null) { + resp.setFinalDirUri(finalDirUri); + } } catch (Exception e) { // Note: it's rather important that this (and other methods) catch Exception, not Throwable; // in combination with HiveSessionProxy.invoke code, perhaps unintentionally, it used