diff --git be/src/runtime/coordinator.cc be/src/runtime/coordinator.cc index 030c691..a35d16c 100644 --- be/src/runtime/coordinator.cc +++ be/src/runtime/coordinator.cc @@ -1614,92 +1614,94 @@ void Coordinator::SetExecPlanFragmentParams( } bool Coordinator::NeedTableDescriptor(const TPlanFragment& fragment) { - - // Check if there is a table sink - if (fragment.__isset.output_sink) { - if (fragment.output_sink.__isset.table_sink && fragment.output_sink.type == TDataSinkType::TABLE_SINK) { - return true; - } - } - - // Check if there is an HDFS scan - for (int i = 0; i < fragment.plan.nodes.size(); ++i) { - TPlanNode plan_node = static_cast(fragment.plan.nodes[i]); - if (plan_node.__isset.hdfs_scan_node) { - return true; - } - } - - return false; + // Check if there is a table sink + if (fragment.__isset.output_sink) { + if (fragment.output_sink.__isset.table_sink + && fragment.output_sink.type == TDataSinkType::TABLE_SINK) { + return true; + } + } + + // Check if there is an HDFS scan + BOOST_FOREACH(const TPlanNode& plan_node, fragment.plan.nodes) { + if (plan_node.__isset.hdfs_scan_node) + return true; + } + + return false; } -void Coordinator::SetExecPlanDescriptorTable(const TPlanFragment& fragment, TExecPlanFragmentParams* rpc_params) { - std::string table_name = ""; - std::string db_name = ""; - TDescriptorTable thrift_desc_tbl; - bool useCustomerDescriptor = false; - - // Always add the Tuple and Slot descriptors - for (size_t j = 0; j < desc_tbl_.tupleDescriptors.size(); ++j) { - TTupleDescriptor tupleDesc = desc_tbl_.tupleDescriptors[j]; - thrift_desc_tbl.tupleDescriptors.push_back(tupleDesc); - } - - for (size_t k = 0; k < desc_tbl_.slotDescriptors.size(); ++k) { - TSlotDescriptor slotDescriptor = desc_tbl_.slotDescriptors[k]; - thrift_desc_tbl.slotDescriptors.push_back(slotDescriptor); - thrift_desc_tbl.__isset.slotDescriptors = true; - useCustomerDescriptor = true; - } - - // If we only need the Tuple and Slot decriptors set them and return - if (!NeedTableDescriptor(fragment)) { - rpc_params->__set_desc_tbl(thrift_desc_tbl); - return; - } - - // Check TTableDescriptor for Table sink - if (fragment.__isset.output_sink) { - if (fragment.output_sink.__isset.table_sink && fragment.output_sink.type == TDataSinkType::TABLE_SINK) { - for (size_t i = 0; i < desc_tbl_.tableDescriptors.size(); ++i) { - const TTableDescriptor& tableDesc = desc_tbl_.tableDescriptors[i]; - if (tableDesc.tableType == TTableType::HDFS_TABLE) { - if (tableDesc.id == fragment.output_sink.table_sink.target_table_id) { - thrift_desc_tbl.tableDescriptors.push_back(tableDesc); - thrift_desc_tbl.__isset.tableDescriptors = true; - useCustomerDescriptor= true; - } - } - } - } - } - - // Check TTableDescriptor for HDFS Scans - for (int i = 0; i < fragment.plan.nodes.size(); ++i) { - TPlanNode plan_node = static_cast(fragment.plan.nodes[i]); - if (plan_node.__isset.hdfs_scan_node) { - table_name = plan_node.hdfs_scan_node.table_name; - db_name = plan_node.hdfs_scan_node.db_name; - for (size_t i = 0; i < desc_tbl_.tableDescriptors.size(); ++i) { - const TTableDescriptor& tableDesc = desc_tbl_.tableDescriptors[i]; - if (tableDesc.tableType == TTableType::HDFS_TABLE) { - if (!db_name.empty() && !table_name.empty()) { - if (table_name.compare(tableDesc.tableName) == 0 && db_name.compare(tableDesc.dbName) == 0) { - thrift_desc_tbl.tableDescriptors.push_back(tableDesc); - thrift_desc_tbl.__isset.tableDescriptors = true; - useCustomerDescriptor= true; - } - } - } - } - } - } - - if (useCustomerDescriptor == true) { - // Use the trimmed down TDescriptorTable - rpc_params->__set_desc_tbl(thrift_desc_tbl); - } else { - rpc_params->__set_desc_tbl(desc_tbl_); - } +void Coordinator::SetExecPlanDescriptorTable(const TPlanFragment& fragment, + TExecPlanFragmentParams* rpc_params) { + string table_name = ""; + string db_name = ""; + TDescriptorTable thrift_desc_tbl; + + // If a matching table sink and/or an HDFS table scan were found + // use the trimmed TDescriptorTable + bool use_custom_tdescriptor_tbl = false; + + // Always add the Tuple and Slot descriptors + for (size_t j = 0; j < desc_tbl_.tupleDescriptors.size(); ++j) { + TTupleDescriptor tupleDesc = desc_tbl_.tupleDescriptors[j]; + thrift_desc_tbl.tupleDescriptors.push_back(tupleDesc); + } + + for (size_t k = 0; k < desc_tbl_.slotDescriptors.size(); ++k) { + TSlotDescriptor slotDescriptor = desc_tbl_.slotDescriptors[k]; + thrift_desc_tbl.slotDescriptors.push_back(slotDescriptor); + thrift_desc_tbl.__isset.slotDescriptors = true; + } + + // If we only need the Tuple and Slot decriptors set them and return + if (!NeedTableDescriptor(fragment)) { + rpc_params->__set_desc_tbl(thrift_desc_tbl); + return; + } + + // Check TTableDescriptor for Table sink + if (fragment.__isset.output_sink) { + if (fragment.output_sink.__isset.table_sink + && fragment.output_sink.type == TDataSinkType::TABLE_SINK) { + for (size_t i = 0; i < desc_tbl_.tableDescriptors.size(); ++i) { + const TTableDescriptor& tableDesc = desc_tbl_.tableDescriptors[i]; + if (tableDesc.tableType == TTableType::HDFS_TABLE) { + if (tableDesc.id == fragment.output_sink.table_sink.target_table_id) { + thrift_desc_tbl.tableDescriptors.push_back(tableDesc); + thrift_desc_tbl.__isset.tableDescriptors = true; + use_custom_tdescriptor_tbl = true; + } + } + } + } + } + + // Check TTableDescriptor for HDFS Scans + for (int i = 0; i < fragment.plan.nodes.size(); ++i) { + TPlanNode plan_node = static_cast(fragment.plan.nodes[i]); + if (plan_node.__isset.hdfs_scan_node) { + table_name = plan_node.hdfs_scan_node.table_name; + db_name = plan_node.hdfs_scan_node.db_name; + for (size_t i = 0; i < desc_tbl_.tableDescriptors.size(); ++i) { + const TTableDescriptor& tableDesc = desc_tbl_.tableDescriptors[i]; + if (tableDesc.tableType == TTableType::HDFS_TABLE) { + if (!db_name.empty() && !table_name.empty()) { + if (table_name.compare(tableDesc.tableName) == 0 + && db_name.compare(tableDesc.dbName) == 0) { + thrift_desc_tbl.tableDescriptors.push_back(tableDesc); + thrift_desc_tbl.__isset.tableDescriptors = true; + use_custom_tdescriptor_tbl = true; + } + } + } + } + } + } + + if (use_custom_tdescriptor_tbl) { + rpc_params->__set_desc_tbl(thrift_desc_tbl); + } else { + rpc_params->__set_desc_tbl(desc_tbl_); + } } } diff --git be/src/runtime/coordinator.h be/src/runtime/coordinator.h index 3f14388..794c373 100644 --- be/src/runtime/coordinator.h +++ be/src/runtime/coordinator.h @@ -177,9 +177,10 @@ class Coordinator { SpinLock& GetExecSummaryLock() const { return exec_summary_lock_; } - // Checks if the current TPlanFragment needs any TDescriptorTable(s) - // For intermediate fragments where there is no scan or sink the TDescriptorTable(s) are not needed. - static bool NeedTableDescriptor(const TPlanFragment& fragment); + // Checks if the current TPlanFragment needs any TDescriptorTable(s) + // For instance intermediate fragments where there is no scan or sink + // the TDescriptorTable(s) are not needed. + static bool NeedTableDescriptor(const TPlanFragment& fragment); private: class BackendExecState; @@ -267,7 +268,8 @@ class Coordinator { ObjectPool* obj_pool() { return obj_pool_.get(); } // Sets the TDescriptorTable(s) for the current fragment - void SetExecPlanDescriptorTable(const TPlanFragment& fragment, TExecPlanFragmentParams* rpc_params); + void SetExecPlanDescriptorTable(const TPlanFragment& fragment, + TExecPlanFragmentParams* rpc_params); /// True if execution has completed, false otherwise. bool execution_completed_; diff --git be/src/runtime/descriptors.cc be/src/runtime/descriptors.cc index 15b6bfd..cc3698b 100644 --- be/src/runtime/descriptors.cc +++ be/src/runtime/descriptors.cc @@ -451,81 +451,75 @@ string RowDescriptor::DebugString() const { return ss.str(); } -Status DescriptorTbl::Create(ObjectPool* pool, - const TDescriptorTable& thrift_tbl, DescriptorTbl** tbl, - std::string tableName, std::string dbName) { - *tbl = pool->Add(new DescriptorTbl()); - // deserialize table descriptors first, they are being referenced by tuple descriptors - for (size_t i = 0; i < thrift_tbl.tableDescriptors.size(); ++i) { - const TTableDescriptor& tdesc = thrift_tbl.tableDescriptors[i]; - TableDescriptor* desc = NULL; - switch (tdesc.tableType) { - case TTableType::HDFS_TABLE: - if (tableName.compare(tdesc.tableName) == 0 && dbName.compare(tdesc.dbName) == 0) { - desc = pool->Add(new HdfsTableDescriptor(tdesc, pool)); - } else { - desc = pool->Add(new HdfsTableDescriptor(tdesc, pool)); - } - - break; - case TTableType::HBASE_TABLE: - desc = pool->Add(new HBaseTableDescriptor(tdesc)); - break; - case TTableType::DATA_SOURCE_TABLE: - desc = pool->Add(new DataSourceTableDescriptor(tdesc)); - break; - default: - DCHECK(false) << "invalid table type: " << tdesc.tableType; - } - - if (desc == NULL) - continue; - - (*tbl)->tbl_desc_map_[tdesc.id] = desc; - } - - return Status::OK(); -} - -Status DescriptorTbl::CreateTuples(ObjectPool* pool, - const TDescriptorTable& thrift_tbl, DescriptorTbl** tbl) { - - if(*tbl == NULL) +Status DescriptorTbl::Create(ObjectPool* pool, const TDescriptorTable& thrift_tbl, + DescriptorTbl** tbl, bool add_table_descriptor, string table_name, string db_name) { *tbl = pool->Add(new DescriptorTbl()); - for (size_t i = 0; i < thrift_tbl.tupleDescriptors.size(); ++i) { - const TTupleDescriptor& tdesc = thrift_tbl.tupleDescriptors[i]; - TupleDescriptor* desc = pool->Add(new TupleDescriptor(tdesc)); - // fix up table pointer - TableDescriptor* table_desc = (*tbl)->GetTableDescriptor(tdesc.tableId); - if (tdesc.__isset.tableId && table_desc !=NULL) { - desc->table_desc_ = table_desc; - DCHECK(desc->table_desc_ != NULL); - } - (*tbl)->tuple_desc_map_[tdesc.id] = desc; - } - - for (size_t i = 0; i < thrift_tbl.slotDescriptors.size(); ++i) { - const TSlotDescriptor& tdesc = thrift_tbl.slotDescriptors[i]; - // Tuple descriptors are already populated in tbl - TupleDescriptor* parent = (*tbl)->GetTupleDescriptor(tdesc.parent); - if (parent == NULL) - continue; - TupleDescriptor* collection_item_descriptor = tdesc.__isset.itemTupleId ? - (*tbl)->GetTupleDescriptor(tdesc.itemTupleId) : NULL; - SlotDescriptor* slot_d = pool->Add( - new SlotDescriptor(tdesc, parent, collection_item_descriptor)); - (*tbl)->slot_desc_map_[tdesc.id] = slot_d; - - // link to parent - TupleDescriptorMap::iterator entry = (*tbl)->tuple_desc_map_.find(tdesc.parent); - if (entry == (*tbl)->tuple_desc_map_.end()) { - return Status("unknown tid in slot descriptor msg"); - } - entry->second->AddSlot(slot_d); - } - - return Status::OK(); + if (add_table_descriptor) { + // deserialize table descriptors first, they are being referenced by tuple descriptors + for (size_t i = 0; i < thrift_tbl.tableDescriptors.size(); ++i) { + const TTableDescriptor& tdesc = thrift_tbl.tableDescriptors[i]; + TableDescriptor* desc = NULL; + switch (tdesc.tableType) { + case TTableType::HDFS_TABLE: + if (table_name.compare(tdesc.tableName) == 0 + && db_name.compare(tdesc.dbName) == 0) { + desc = pool->Add(new HdfsTableDescriptor(tdesc, pool)); + } else { + desc = pool->Add(new HdfsTableDescriptor(tdesc, pool)); + } + + break; + case TTableType::HBASE_TABLE: + desc = pool->Add(new HBaseTableDescriptor(tdesc)); + break; + case TTableType::DATA_SOURCE_TABLE: + desc = pool->Add(new DataSourceTableDescriptor(tdesc)); + break; + default: + DCHECK(false) << "invalid table type: " << tdesc.tableType; + } + + if (desc == NULL) + continue; + + (*tbl)->tbl_desc_map_[tdesc.id] = desc; + } + } + + for (size_t i = 0; i < thrift_tbl.tupleDescriptors.size(); ++i) { + const TTupleDescriptor& tdesc = thrift_tbl.tupleDescriptors[i]; + TupleDescriptor* desc = pool->Add(new TupleDescriptor(tdesc)); + // fix up table pointer + TableDescriptor* table_desc = (*tbl)->GetTableDescriptor(tdesc.tableId); + if (tdesc.__isset.tableId && table_desc != NULL) { + desc->table_desc_ = table_desc; + DCHECK(desc->table_desc_ != NULL); + } + (*tbl)->tuple_desc_map_[tdesc.id] = desc; + } + + for (size_t i = 0; i < thrift_tbl.slotDescriptors.size(); ++i) { + const TSlotDescriptor& tdesc = thrift_tbl.slotDescriptors[i]; + // Tuple descriptors are already populated in tbl + TupleDescriptor* parent = (*tbl)->GetTupleDescriptor(tdesc.parent); + if (parent == NULL) + continue; + TupleDescriptor* collection_item_descriptor = + tdesc.__isset.itemTupleId ? (*tbl)->GetTupleDescriptor(tdesc.itemTupleId) : NULL; + SlotDescriptor* slot_d = pool->Add( + new SlotDescriptor(tdesc, parent, collection_item_descriptor)); + (*tbl)->slot_desc_map_[tdesc.id] = slot_d; + + // link to parent + TupleDescriptorMap::iterator entry = (*tbl)->tuple_desc_map_.find(tdesc.parent); + if (entry == (*tbl)->tuple_desc_map_.end()) { + return Status("unknown tid in slot descriptor msg"); + } + entry->second->AddSlot(slot_d); + } + + return Status::OK(); } TableDescriptor* DescriptorTbl::GetTableDescriptor(TableId id) const { diff --git be/src/runtime/descriptors.h be/src/runtime/descriptors.h index 6b71915..5edc33a 100644 --- be/src/runtime/descriptors.h +++ be/src/runtime/descriptors.h @@ -405,14 +405,11 @@ class DescriptorTbl { public: /// Creates a descriptor tbl within 'pool' from thrift_tbl and returns it via 'tbl'. /// Returns OK on success, otherwise error (in which case 'tbl' will be unset). + /// Table & db names are used to find the TTableDescriptor matching the + /// current plan fragment. static Status Create(ObjectPool* pool, const TDescriptorTable& thrift_tbl, - DescriptorTbl** tbl, std::string tableName = "", - std::string dbName = ""); - - /// Creates a descriptor tbl within 'pool' from thrift_tbl and returns it via 'tbl'. - /// Returns OK on success, otherwise error (in which case 'tbl' will be unset). - static Status CreateTuples(ObjectPool* pool, const TDescriptorTable& thrift_tbl, - DescriptorTbl** tbl); + DescriptorTbl** tbl,bool add_table_descriptor = true, string table_name = "", + string db_name = ""); TableDescriptor* GetTableDescriptor(TableId id) const; TupleDescriptor* GetTupleDescriptor(TupleId id) const; diff --git be/src/runtime/plan-fragment-executor.cc be/src/runtime/plan-fragment-executor.cc index cc751f5..746c7b1 100644 --- be/src/runtime/plan-fragment-executor.cc +++ be/src/runtime/plan-fragment-executor.cc @@ -186,28 +186,25 @@ Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) { runtime_state_->resource_pool())); // TODO : This code assumes that there is a single HdfsScanNode per fragment plan. - // When bucketed tables are introduced look into chaning this. + // When bucketed tables are introduced look into changing this. std::string table_name = ""; - std::string db_name = ""; - for (int i = 0; i < request.fragment.plan.nodes.size(); ++i) { - TPlanNode plan_node = static_cast(request.fragment.plan.nodes[i]); - if (plan_node.__isset.hdfs_scan_node) { - table_name = plan_node.hdfs_scan_node.table_name; - db_name = plan_node.hdfs_scan_node.db_name; - } - } + std::string db_name = ""; + for (int i = 0; i < request.fragment.plan.nodes.size(); ++i) { + TPlanNode plan_node = static_cast(request.fragment.plan.nodes[i]); + if (plan_node.__isset.hdfs_scan_node) { + table_name = plan_node.hdfs_scan_node.table_name; + db_name = plan_node.hdfs_scan_node.db_name; + } + } // set up desc tbl DescriptorTbl* desc_tbl = NULL; - if (Coordinator::NeedTableDescriptor(request.fragment)) { - DCHECK(request.__isset.desc_tbl); - RETURN_IF_ERROR( - DescriptorTbl::Create(obj_pool(), request.desc_tbl, &desc_tbl, table_name, db_name)); - } + bool need_table_descriptor = Coordinator::NeedTableDescriptor(request.fragment); + if (need_table_descriptor) DCHECK(request.__isset.desc_tbl); - // Always add the Tuple and Slot descriptors - RETURN_IF_ERROR( - DescriptorTbl::CreateTuples(obj_pool(), request.desc_tbl, &desc_tbl)); + RETURN_IF_ERROR( + DescriptorTbl::Create(obj_pool(), request.desc_tbl, &desc_tbl, + need_table_descriptor, table_name, db_name)); runtime_state_->set_desc_tbl(desc_tbl); VLOG_QUERY << "descriptor table for fragment=" << request.fragment_instance_ctx.fragment_instance_id