diff --git be/src/runtime/coordinator.cc be/src/runtime/coordinator.cc index fd6adfe..457513f 100644 --- be/src/runtime/coordinator.cc +++ be/src/runtime/coordinator.cc @@ -1125,6 +1125,7 @@ Status Coordinator::ExecRemoteFragment(void* exec_state_arg) { TExecPlanFragmentResult thrift_result; Status rpc_status = backend_client.DoRpc(&ImpalaInternalServiceClient::ExecPlanFragment, exec_state->rpc_params, &thrift_result); + if (!rpc_status.ok()) { stringstream msg; msg << "ExecPlanRequest rpc query_id=" << query_id_ @@ -1575,7 +1576,9 @@ void Coordinator::SetExecPlanFragmentParams( const TNetworkAddress& coord, TExecPlanFragmentParams* rpc_params) { rpc_params->__set_protocol_version(ImpalaInternalServiceVersion::V1); rpc_params->__set_fragment(fragment); - rpc_params->__set_desc_tbl(desc_tbl_); + + SetExecPlanDescriptorTable(fragment, rpc_params); + TNetworkAddress exec_host = params.hosts[instance_idx]; if (schedule.HasReservation()) { // The reservation has already have been validated at this point. @@ -1612,4 +1615,65 @@ void Coordinator::SetExecPlanFragmentParams( rpc_params->__isset.fragment_instance_ctx = true; } +void Coordinator::SetExecPlanDescriptorTable(const TPlanFragment& fragment, TExecPlanFragmentParams* rpc_params) { + std::string table_name = ""; + std::string db_name = ""; + TDescriptorTable thrift_desc_tbl; + bool useCustomerDescriptor = false; + + if (fragment.__isset.output_sink) { + if (fragment.output_sink.__isset.table_sink && fragment.output_sink.type == TDataSinkType::TABLE_SINK) { + for (size_t i = 0; i < desc_tbl_.tableDescriptors.size(); ++i) { + const TTableDescriptor& tableDesc = desc_tbl_.tableDescriptors[i]; + if (tableDesc.tableType == TTableType::HDFS_TABLE) { + if (tableDesc.id == fragment.output_sink.table_sink.target_table_id) { + thrift_desc_tbl.tableDescriptors.push_back(tableDesc); + useCustomerDescriptor= true; + } + } + } + } + } + + for (int i = 0; i < fragment.plan.nodes.size(); ++i) { + TPlanNode plan_node = static_cast(fragment.plan.nodes[i]); + if (plan_node.__isset.hdfs_scan_node) { + table_name = plan_node.hdfs_scan_node.table_name; + db_name = plan_node.hdfs_scan_node.db_name; + for (size_t i = 0; i < desc_tbl_.tableDescriptors.size(); ++i) { + const TTableDescriptor& tableDesc = desc_tbl_.tableDescriptors[i]; + if (tableDesc.tableType == TTableType::HDFS_TABLE) { + if (!db_name.empty() && !table_name.empty()) { + if (table_name.compare(tableDesc.tableName) == 0 && db_name.compare(tableDesc.dbName) == 0) { + thrift_desc_tbl.tableDescriptors.push_back(tableDesc); + thrift_desc_tbl.__isset.tableDescriptors = true; + useCustomerDescriptor= true; + } + } + } + } + } + } + + if (useCustomerDescriptor == true) { + // Add the tuple descriptors + for (size_t j = 0; j < desc_tbl_.tupleDescriptors.size(); ++j) { + TTupleDescriptor tupleDesc = desc_tbl_.tupleDescriptors[j]; + thrift_desc_tbl.tupleDescriptors.push_back(tupleDesc); + } + + // Add the slot descriptors + for (size_t k = 0; k < desc_tbl_.slotDescriptors.size(); ++k) { + TSlotDescriptor slotDescriptor = desc_tbl_.slotDescriptors[k]; + thrift_desc_tbl.slotDescriptors.push_back(slotDescriptor); + thrift_desc_tbl.__isset.slotDescriptors = true; + useCustomerDescriptor = true; + } + + rpc_params->__set_desc_tbl(thrift_desc_tbl); + + } else { + rpc_params->__set_desc_tbl(desc_tbl_); + } +} } diff --git be/src/runtime/coordinator.h be/src/runtime/coordinator.h index 5bf0436..233e760 100644 --- be/src/runtime/coordinator.h +++ be/src/runtime/coordinator.h @@ -262,6 +262,8 @@ class Coordinator { /// Returns a local object pool. ObjectPool* obj_pool() { return obj_pool_.get(); } + void SetExecPlanDescriptorTable(const TPlanFragment& fragment, TExecPlanFragmentParams* rpc_params); + /// True if execution has completed, false otherwise. bool execution_completed_; diff --git be/src/runtime/descriptors.cc be/src/runtime/descriptors.cc index 4ed700a..d8146f4 100644 --- be/src/runtime/descriptors.cc +++ be/src/runtime/descriptors.cc @@ -451,8 +451,9 @@ string RowDescriptor::DebugString() const { return ss.str(); } -Status DescriptorTbl::Create(ObjectPool* pool, const TDescriptorTable& thrift_tbl, - DescriptorTbl** tbl) { +Status DescriptorTbl::Create(ObjectPool* pool, + const TDescriptorTable& thrift_tbl, DescriptorTbl** tbl, + std::string tableName, std::string dbName) { *tbl = pool->Add(new DescriptorTbl()); // deserialize table descriptors first, they are being referenced by tuple descriptors for (size_t i = 0; i < thrift_tbl.tableDescriptors.size(); ++i) { @@ -460,7 +461,12 @@ Status DescriptorTbl::Create(ObjectPool* pool, const TDescriptorTable& thrift_tb TableDescriptor* desc = NULL; switch (tdesc.tableType) { case TTableType::HDFS_TABLE: - desc = pool->Add(new HdfsTableDescriptor(tdesc, pool)); + if (tableName.compare(tdesc.tableName) == 0 && dbName.compare(tdesc.dbName) == 0) { + desc = pool->Add(new HdfsTableDescriptor(tdesc, pool)); + } else { + desc = pool->Add(new HdfsTableDescriptor(tdesc, pool)); + } + break; case TTableType::HBASE_TABLE: desc = pool->Add(new HBaseTableDescriptor(tdesc)); @@ -471,6 +477,10 @@ Status DescriptorTbl::Create(ObjectPool* pool, const TDescriptorTable& thrift_tb default: DCHECK(false) << "invalid table type: " << tdesc.tableType; } + + if (desc == NULL) + continue; + (*tbl)->tbl_desc_map_[tdesc.id] = desc; } @@ -478,9 +488,10 @@ Status DescriptorTbl::Create(ObjectPool* pool, const TDescriptorTable& thrift_tb const TTupleDescriptor& tdesc = thrift_tbl.tupleDescriptors[i]; TupleDescriptor* desc = pool->Add(new TupleDescriptor(tdesc)); // fix up table pointer - if (tdesc.__isset.tableId) { - desc->table_desc_ = (*tbl)->GetTableDescriptor(tdesc.tableId); - DCHECK(desc->table_desc_ != NULL); + TableDescriptor* table_desc = (*tbl)->GetTableDescriptor(tdesc.tableId); + if (tdesc.__isset.tableId && table_desc !=NULL) { + desc->table_desc_ = table_desc; + DCHECK(desc->table_desc_ != NULL); } (*tbl)->tuple_desc_map_[tdesc.id] = desc; } @@ -489,6 +500,8 @@ Status DescriptorTbl::Create(ObjectPool* pool, const TDescriptorTable& thrift_tb const TSlotDescriptor& tdesc = thrift_tbl.slotDescriptors[i]; // Tuple descriptors are already populated in tbl TupleDescriptor* parent = (*tbl)->GetTupleDescriptor(tdesc.parent); + if (parent == NULL) + continue; TupleDescriptor* collection_item_descriptor = tdesc.__isset.itemTupleId ? (*tbl)->GetTupleDescriptor(tdesc.itemTupleId) : NULL; SlotDescriptor* slot_d = pool->Add( diff --git be/src/runtime/descriptors.h be/src/runtime/descriptors.h index e36330c..10d3367 100644 --- be/src/runtime/descriptors.h +++ be/src/runtime/descriptors.h @@ -405,8 +405,9 @@ class DescriptorTbl { public: /// Creates a descriptor tbl within 'pool' from thrift_tbl and returns it via 'tbl'. /// Returns OK on success, otherwise error (in which case 'tbl' will be unset). - static Status Create(ObjectPool* pool, const TDescriptorTable& thrift_tbl, - DescriptorTbl** tbl); + static Status Create(ObjectPool* pool, const TDescriptorTable& thrift_tbl, + DescriptorTbl** tbl, std::string tableName = "", + std::string dbName = ""); TableDescriptor* GetTableDescriptor(TableId id) const; TupleDescriptor* GetTupleDescriptor(TupleId id) const; diff --git be/src/runtime/plan-fragment-executor.cc be/src/runtime/plan-fragment-executor.cc index fd0cb46..cca2110 100644 --- be/src/runtime/plan-fragment-executor.cc +++ be/src/runtime/plan-fragment-executor.cc @@ -184,11 +184,23 @@ Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) { bind(mem_fn(&ThreadResourceMgr::ResourcePool::num_threads), runtime_state_->resource_pool())); + // TODO : This code assumes that there is a single HdfsScanNode per fragment plan. + // When bucketed tables are introduced look into chaning this. + std::string table_name = ""; + std::string db_name = ""; + for (int i = 0; i < request.fragment.plan.nodes.size(); ++i) { + TPlanNode plan_node = static_cast(request.fragment.plan.nodes[i]); + if (plan_node.__isset.hdfs_scan_node) { + table_name = plan_node.hdfs_scan_node.table_name; + db_name = plan_node.hdfs_scan_node.db_name; + } + } + // set up desc tbl DescriptorTbl* desc_tbl = NULL; DCHECK(request.__isset.desc_tbl); RETURN_IF_ERROR( - DescriptorTbl::Create(obj_pool(), request.desc_tbl, &desc_tbl)); + DescriptorTbl::Create(obj_pool(), request.desc_tbl, &desc_tbl, table_name, db_name)); runtime_state_->set_desc_tbl(desc_tbl); VLOG_QUERY << "descriptor table for fragment=" << request.fragment_instance_ctx.fragment_instance_id diff --git common/thrift/PlanNodes.thrift common/thrift/PlanNodes.thrift index 91e7ee7..80daf38 100644 --- common/thrift/PlanNodes.thrift +++ common/thrift/PlanNodes.thrift @@ -118,6 +118,8 @@ struct THdfsScanNode { // collection-typed slots. Maps from item tuple id to the list of conjuncts // to be evaluated. 2: optional map> collection_conjuncts + 3: optional string table_name + 4: optional string db_name } struct TDataSourceScanNode { diff --git fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java index 2070531..e808937 100644 --- fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java +++ fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java @@ -811,6 +811,8 @@ public class HdfsScanNode extends ScanNode { @Override protected void toThrift(TPlanNode msg) { msg.hdfs_scan_node = new THdfsScanNode(desc_.getId().asInt()); + msg.hdfs_scan_node.table_name = this.tbl_.getName(); + msg.hdfs_scan_node.db_name = this.tbl_.getDb().getName(); msg.node_type = TPlanNodeType.HDFS_SCAN_NODE; if (!collectionConjuncts_.isEmpty()) { Map> tcollectionConjuncts = Maps.newLinkedHashMap();