安全的网站建设服务,叫别人做网站安全吗,济南网站优化推广公司电话,全球网站排名查询网一、节点的定义
在前面分析过了节点相关的应用和功能#xff0c;也在其中分析过一些节点的数据定义情况。本文就对节点的数据定义进行一个更详细具体的分析说明#xff0c;特别是对一些应用上的细节展开说明一下。知其然#xff0c;然后知其所以然。 节点的定义#xff0c…一、节点的定义
在前面分析过了节点相关的应用和功能也在其中分析过一些节点的数据定义情况。本文就对节点的数据定义进行一个更详细具体的分析说明特别是对一些应用上的细节展开说明一下。知其然然后知其所以然。 节点的定义基本都是以模板进行的。所以希望大家有一些基本的模板的知识否则看起来还是有些麻烦的。官方文档这样说“A node is a class that inherits from oneapi::tbb::flow::graph_node and also typically inherits from oneapi::tbb::flow::sender , oneapi::tbb::flow::receiver or both. ”。可以理解为在TBB中创建节点一般是继承自上面的所说的三个数据结构的定义。当然在TBB中已经定义了好几种节点类型一般情况下是不需要自己主动去自定义自己的节点类型了。 节点的数据结构一般分为三块
template typename Body function_node(graph g, size_t concurrency, Body body)也就是图并行度和不同类型的执行体。图主要是用来限制节点的应用范围即这个节点在哪个图中进行工作concurrency用来限制节点并发调用的次数可以是11代表串行N次;而Body则表示开发者定义的消息处理函数它可以是函数、Lambda表达式等。通过这三项基本就可以完成整个TBB节点的工作处理。
二、基本数据结构
下面先看一下基础的三个类型节点的定义中的第一个graph_node
//! The base of all graph nodes.
class graph_node : no_copy {friend class graph;templatetypename C, typename Nfriend class graph_iterator;#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SETfriend class get_graph_helper;
#endifprotected:graph my_graph;graph graph_reference() const {// TODO revamp: propagate graph_reference() method to all the reference places.return my_graph;}graph_node* next nullptr;graph_node* prev nullptr;
public:explicit graph_node(graph g);virtual ~graph_node();protected:// performs the reset on an individual node.virtual void reset_node(reset_flags f rf_reset_protocol) 0;
}; // class graph_nodegraph_node在上面的注释中已经说明了其是TBB图中所有节点的基类。所以看它的实际定义其实非常简单除了图相关的友元和变量外只有几个节点的前驱和后继指针。其显示的构造函数要求必须输入一个图的数据对象。这也符合基类是一个抽象的设计。 再看一下相关的sender:
//! Pure virtual template class that defines a sender of messages of type T
template typename T
class sender {
public:virtual ~sender() {}//! Request an item from the sendervirtual bool try_get( T ) { return false; }#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAITvirtual bool try_get( T , message_metainfo ) { return false; }
#endif//! Reserves an item in the sendervirtual bool try_reserve( T ) { return false; }#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAITvirtual bool try_reserve( T , message_metainfo ) { return false; }
#endif//! Releases the reserved itemvirtual bool try_release( ) { return false; }//! Consumes the reserved itemvirtual bool try_consume( ) { return false; }protected://! The output type of this sendertypedef T output_type;//! The successor type for this nodetypedef receiverT successor_type;//! Add a new successor to this nodevirtual bool register_successor( successor_type r ) 0;//! Removes a successor from this nodevirtual bool remove_successor( successor_type r ) 0;templatetypename Cfriend bool register_successor(senderC s, receiverC r);templatetypename Cfriend bool remove_successor (senderC s, receiverC r);
}; // class senderT上面的类注释也说明了它是一个发送消息的纯虚拟的模板类。而在TBB中节点的一个重要的功能就是能够收发消息而消息的收发又受限于图的约束边和节点。既然是一个纯虚类那它就是一个接口类。这也符合消息传递的接口性的设计原则。在其中可以看到消息处理的函数接口如try_get等 。而与其相类似的是receiver:
//! Pure virtual template class that defines a receiver of messages of type T
template typename T
class receiver {
private:template typename... TryPutTaskArgsbool internal_try_put(const T t, TryPutTaskArgs... args) {graph_task* res try_put_task(t, std::forwardTryPutTaskArgs(args)...);if (!res) return false;if (res ! SUCCESSFULLY_ENQUEUED) spawn_in_graph_arena(graph_reference(), *res);return true;}public://! Destructorvirtual ~receiver() {}//! Put an item to the receiverbool try_put( const T t ) {return internal_try_put(t);}#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT//! Put an item to the receiver and wait for completionbool try_put_and_wait( const T t ) {// Since try_put_and_wait is a blocking call, it is safe to create wait_context on stackd1::wait_context_vertex msg_wait_vertex{};bool res internal_try_put(t, message_metainfo{message_metainfo::waiters_type{msg_wait_vertex}});if (res) {__TBB_ASSERT(graph_reference().my_context ! nullptr, No wait_context associated with the Flow Graph);wait(msg_wait_vertex.get_context(), *graph_reference().my_context);}return res;}
#endif//! put item to successor; return task to run the successor if possible.
protected://! The input type of this receivertypedef T input_type;//! The predecessor type for this nodetypedef senderT predecessor_type;template typename R, typename B friend class run_and_put_task;template typename X, typename Y friend class broadcast_cache;template typename X, typename Y friend class round_robin_cache;virtual graph_task *try_put_task(const T t) 0;
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAITvirtual graph_task *try_put_task(const T t, const message_metainfo) 0;
#endifvirtual graph graph_reference() const 0;templatetypename TT, typename M friend class successor_cache;virtual bool is_continue_receiver() { return false; }// TODO revamp: reconsider the inheritance and move node priority out of receivervirtual node_priority_t priority() const { return no_priority; }//! Add a predecessor to the nodevirtual bool register_predecessor( predecessor_type ) { return false; }//! Remove a predecessor from the nodevirtual bool remove_predecessor( predecessor_type ) { return false; }template typename Cfriend bool register_predecessor(receiverC r, senderC s);template typename Cfriend bool remove_predecessor (receiverC r, senderC s);
}; // class receiverT在下面的try_put函数中可以看到TBB内部的处理过程 //! Put an item to the receiverbool try_put( const T t ) {return internal_try_put(t);}template typename... TryPutTaskArgsbool internal_try_put(const T t, TryPutTaskArgs... args) {graph_task* res try_put_task(t, std::forwardTryPutTaskArgs(args)...);if (!res) return false;if (res ! SUCCESSFULLY_ENQUEUED) spawn_in_graph_arena(graph_reference(), *res);return true;}看明白了这三个基础的数据结构也就明白了为什么官网说都要从这三个类中继承。主要原因就在于一个节点中抛开图的相关内容后关于节点的相关控制和消息处理这三个类或多或少的都有。也就是说除非完全手动创造代码那最简单方便的就是从这三个类继承一下。
三、节点预定义数据结构
1、输入节点
//! An executable node that acts as a source, i.e. it has no predecessorstemplate typename Output __TBB_requires(std::copyableOutput)
class input_node : public graph_node, public sender Output {
public://! The type of the output message, which is completetypedef Output output_type;//! The type of successors of this nodetypedef typename senderoutput_type::successor_type successor_type;// Input node has no input typetypedef null_type input_type;//! Constructor for a node with a successortemplate typename Body __TBB_requires(input_node_bodyBody, Output)__TBB_NOINLINE_SYM input_node( graph g, Body body ): graph_node(g), my_active(false), my_body( new input_body_leaf output_type, Body(body) ), my_init_body( new input_body_leaf output_type, Body(body) ), my_successors(this), my_reserved(false), my_has_cached_item(false){fgt_node_with_body(CODEPTR(), FLOW_INPUT_NODE, this-my_graph,static_castsenderoutput_type *(this), this-my_body);}#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SETtemplate typename Body, typename... Successors__TBB_requires(input_node_bodyBody, Output)input_node( const node_setorder::preceding, Successors... successors, Body body ): input_node(successors.graph_reference(), body){make_edges(*this, successors);}
#endif//! Copy constructor__TBB_NOINLINE_SYM input_node( const input_node src ): graph_node(src.my_graph), senderOutput(), my_active(false), my_body(src.my_init_body-clone()), my_init_body(src.my_init_body-clone()), my_successors(this), my_reserved(false), my_has_cached_item(false){fgt_node_with_body(CODEPTR(), FLOW_INPUT_NODE, this-my_graph,static_castsenderoutput_type *(this), this-my_body);}//! The destructor~input_node() { delete my_body; delete my_init_body; }//! Add a new successor to this nodebool register_successor( successor_type r ) override {spin_mutex::scoped_lock lock(my_mutex);my_successors.register_successor(r);if ( my_active )spawn_put();return true;}//! Removes a successor from this nodebool remove_successor( successor_type r ) override {spin_mutex::scoped_lock lock(my_mutex);my_successors.remove_successor(r);return true;}//! Request an item from the nodebool try_get( output_type v ) override {spin_mutex::scoped_lock lock(my_mutex);if ( my_reserved )return false;if ( my_has_cached_item ) {v my_cached_item;my_has_cached_item false;return true;}// weve been asked to provide an item, but we have none. enqueue a task to// provide one.if ( my_active )spawn_put();return false;}//! Reserves an item.bool try_reserve( output_type v ) override {spin_mutex::scoped_lock lock(my_mutex);if ( my_reserved ) {return false;}if ( my_has_cached_item ) {v my_cached_item;my_reserved true;return true;} else {return false;}}#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
private:bool try_reserve( output_type v, message_metainfo ) override {return try_reserve(v);}bool try_get( output_type v, message_metainfo ) override {return try_get(v);}
public:
#endif//! Release a reserved item./** true item has been released and so remains in sender, dest must request or reserve future items */bool try_release( ) override {spin_mutex::scoped_lock lock(my_mutex);__TBB_ASSERT( my_reserved my_has_cached_item, releasing non-existent reservation );my_reserved false;if(!my_successors.empty())spawn_put();return true;}//! Consumes a reserved itembool try_consume( ) override {spin_mutex::scoped_lock lock(my_mutex);__TBB_ASSERT( my_reserved my_has_cached_item, consuming non-existent reservation );my_reserved false;my_has_cached_item false;if ( !my_successors.empty() ) {spawn_put();}return true;}//! Activates a node that was created in the inactive statevoid activate() {spin_mutex::scoped_lock lock(my_mutex);my_active true;if (!my_successors.empty())spawn_put();}templatetypename BodyBody copy_function_object() {input_bodyoutput_type body_ref *this-my_body;return dynamic_cast input_body_leafoutput_type, Body (body_ref).get_body();}protected://! resets the input_node to its initial statevoid reset_node( reset_flags f) override {my_active false;my_reserved false;my_has_cached_item false;if(f rf_clear_edges) my_successors.clear();if(f rf_reset_bodies) {input_bodyoutput_type *tmp my_init_body-clone();delete my_body;my_body tmp;}}private:spin_mutex my_mutex;bool my_active;input_bodyoutput_type *my_body;input_bodyoutput_type *my_init_body;broadcast_cache output_type my_successors;bool my_reserved;bool my_has_cached_item;output_type my_cached_item;// used by apply_body_bypass, can invoke body of node.bool try_reserve_apply_body(output_type v) {spin_mutex::scoped_lock lock(my_mutex);if ( my_reserved ) {return false;}if ( !my_has_cached_item ) {d1::flow_control control;fgt_begin_body( my_body );my_cached_item (*my_body)(control);my_has_cached_item !control.is_pipeline_stopped;fgt_end_body( my_body );}if ( my_has_cached_item ) {v my_cached_item;my_reserved true;return true;} else {return false;}}graph_task* create_put_task() {d1::small_object_allocator allocator{};typedef input_node_task_bypass input_nodeoutput_type task_type;graph_task* t allocator.new_objecttask_type(my_graph, allocator, *this);return t;}//! Spawns a task that applies the bodyvoid spawn_put( ) {if(is_graph_active(this-my_graph)) {spawn_in_graph_arena(this-my_graph, *create_put_task());}}friend class input_node_task_bypass input_nodeoutput_type ;//! Applies the body. Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.graph_task* apply_body_bypass( ) {output_type v;if ( !try_reserve_apply_body(v) )return nullptr;graph_task *last_task my_successors.try_put_task(v);if ( last_task )try_consume();elsetry_release();return last_task;}
}; // class input_node输入节点比较复杂一些但仔细看其中的代码其实没有什么特别的其实具体到最后就是任务的处理了。毕竟其属于对消息的接收处理。 2、功能节点
//! Implements a function node that supports Input - Output
templatetypename Input, typename Output continue_msg, typename Policy queueing__TBB_requires(std::default_initializableInput std::copy_constructibleInput std::copy_constructibleOutput)
class function_node: public graph_node, public function_input Input, Output, Policy, cache_aligned_allocatorInput , public function_outputOutput
{typedef cache_aligned_allocatorInput internals_allocator;public:typedef Input input_type;typedef Output output_type;typedef function_inputinput_type,output_type,Policy,internals_allocator input_impl_type;typedef function_input_queueinput_type, internals_allocator input_queue_type;typedef function_outputoutput_type fOutput_type;typedef typename input_impl_type::predecessor_type predecessor_type;typedef typename fOutput_type::successor_type successor_type;using input_impl_type::my_predecessors;//! Constructor// input_queue_type is allocated here, but destroyed in the function_input_base.// TODO: pass the graph_buffer_policy to the function_input_base so it can all// be done in one place. This would be an interface-breaking change.template typename Body __TBB_requires(function_node_bodyBody, Input, Output)__TBB_NOINLINE_SYM function_node( graph g, size_t concurrency,Body body, Policy Policy(), node_priority_t a_priority no_priority ): graph_node(g), input_impl_type(g, concurrency, body, a_priority),fOutput_type(g) {fgt_node_with_body( CODEPTR(), FLOW_FUNCTION_NODE, this-my_graph,static_castreceiverinput_type *(this), static_castsenderoutput_type *(this), this-my_body );}template typename Body__TBB_requires(function_node_bodyBody, Input, Output)function_node( graph g, size_t concurrency, Body body, node_priority_t a_priority ): function_node(g, concurrency, body, Policy(), a_priority) {}#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SETtemplate typename Body, typename... Args__TBB_requires(function_node_bodyBody, Input, Output)function_node( const node_setArgs... nodes, size_t concurrency, Body body,Policy p Policy(), node_priority_t a_priority no_priority ): function_node(nodes.graph_reference(), concurrency, body, p, a_priority) {make_edges_in_order(nodes, *this);}template typename Body, typename... Args__TBB_requires(function_node_bodyBody, Input, Output)function_node( const node_setArgs... nodes, size_t concurrency, Body body, node_priority_t a_priority ): function_node(nodes, concurrency, body, Policy(), a_priority) {}
#endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET//! Copy constructor__TBB_NOINLINE_SYM function_node( const function_node src ) :graph_node(src.my_graph),input_impl_type(src),fOutput_type(src.my_graph) {fgt_node_with_body( CODEPTR(), FLOW_FUNCTION_NODE, this-my_graph,static_castreceiverinput_type *(this), static_castsenderoutput_type *(this), this-my_body );}protected:template typename R, typename B friend class run_and_put_task;templatetypename X, typename Y friend class broadcast_cache;templatetypename X, typename Y friend class round_robin_cache;using input_impl_type::try_put_task;broadcast_cacheoutput_type successors () override { return fOutput_type::my_successors; }void reset_node(reset_flags f) override {input_impl_type::reset_function_input(f);// TODO: use clear() instead.if(f rf_clear_edges) {successors().clear();my_predecessors.clear();}__TBB_ASSERT(!(f rf_clear_edges) || successors().empty(), function_node successors not empty);__TBB_ASSERT(this-my_predecessors.empty(), function_node predecessors not empty);}}; // class function_node
功能节点中可以看一看function_input等几个类似的类做为单输入输出的节点其实麻烦就在于拿到消息后要根据策略来进行处理。 3、输出节点
//! Forwards messages of type T to all successors
template typename T
class broadcast_node : public graph_node, public receiverT, public senderT {
public:typedef T input_type;typedef T output_type;typedef typename receiverinput_type::predecessor_type predecessor_type;typedef typename senderoutput_type::successor_type successor_type;
private:broadcast_cacheinput_type my_successors;
public:__TBB_NOINLINE_SYM explicit broadcast_node(graph g) : graph_node(g), my_successors(this) {fgt_node( CODEPTR(), FLOW_BROADCAST_NODE, this-my_graph,static_castreceiverinput_type *(this), static_castsenderoutput_type *(this) );}#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SETtemplate typename... Argsbroadcast_node(const node_setArgs... nodes) : broadcast_node(nodes.graph_reference()) {make_edges_in_order(nodes, *this);}
#endif// Copy constructor__TBB_NOINLINE_SYM broadcast_node( const broadcast_node src ) : broadcast_node(src.my_graph) {}//! Adds a successorbool register_successor( successor_type r ) override {my_successors.register_successor( r );return true;}//! Removes s as a successorbool remove_successor( successor_type r ) override {my_successors.remove_successor( r );return true;}private:graph_task* try_put_task_impl(const T t __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo metainfo)) {graph_task* new_task my_successors.try_put_task(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));if (!new_task) new_task SUCCESSFULLY_ENQUEUED;return new_task;}protected:template typename R, typename B friend class run_and_put_task;templatetypename X, typename Y friend class broadcast_cache;templatetypename X, typename Y friend class round_robin_cache;//! build a task to run the successor if possible. Default is old behavior.graph_task* try_put_task(const T t) override {return try_put_task_impl(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));}#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAITgraph_task* try_put_task(const T t, const message_metainfo metainfo) override {return try_put_task_impl(t, metainfo);}
#endifgraph graph_reference() const override {return my_graph;}void reset_node(reset_flags f) override {if (frf_clear_edges) {my_successors.clear();}__TBB_ASSERT(!(f rf_clear_edges) || my_successors.empty(), Error resetting broadcast_node);}
}; // broadcast_node
而广播节点则更容易为开发者理解它其实就是将消息转发的一个节点。不做缓存这和实际的广播也是相同的。广播完成消息也就没了。 当然除了上面的三种节点在TBB中还有不少的节点类型这里只截取这三种比较有代表性的来给大家分析一下。其实通过这些节点的类型分析是不是考虑可以更抽象一层的对其进行封装而不是简单的应用这些节点类型。让这些节点被抽象到更高层后可能应用会更方便也可能更符合当前的实际应用场景。
四、总结
源码之前了无秘密。这句话太实在了。一个高手的再好的设计再牛的编码最终都得体现到代码中去。否则这种思想的体现就无法实现而无法实现的思想至少在实践这个角度上就无法让大多数人学会。计算机技术的特点就是如此代码就是王道。