当前位置: 首页 > news >正文

兰州网站排名推广镇江集团网站建设

兰州网站排名推广,镇江集团网站建设,乐清人才网官方网站,网站建设与管理 课件大家看可以看ElasticSearch源码#xff1a;Rest请求与Master节点处理流程#xff08;1#xff09; 这个图非常好#xff0c;下午的讲解代码在各个类和方法之间流转#xff0c;都体现这个图上 一、Master节点处理请求的逻辑1、节点(数据节点)要和主节点进行通讯#xff0… 大家看可以看ElasticSearch源码Rest请求与Master节点处理流程1 这个图非常好下午的讲解代码在各个类和方法之间流转都体现这个图上 一、Master节点处理请求的逻辑1、节点(数据节点)要和主节点进行通讯需要继承自基类MasterNodeRequest2、Master节点处理来自客户端的请求(以创建索引请求举例)(1)首先会找到RestHandler中创建索引的Action(RestCreateIndexAction)(2)再执行继承自TransportMasterNodeAction的Action必须实现的masterOperation方法 二、RestHander的Action如何映射到TransportMasterNodeAction(还是以创建索引举例)1、首先通过nodeClient执行doExecute()2、创建一个task任务异步执行TransportAction3、TransportMasterNodeAction中doExecute会通过线程池调用子类实现的masterOperation方法4、TransportCreateIndexAction的masterOperation实现会调用createIndexService接口创建索引 一、Master节点处理请求的逻辑 不是所有的请求都需要Master节点处理但是有些请求必须让Master节点处理比如创建index下面的3就是用创建索引做的示例 1、节点(数据节点)要和主节点进行通讯需要继承自基类MasterNodeRequest 主节点在 Elasticsearch 集群中负责集群的管理和协调工作。当节点需要执行某些操作时它将创建相应的 MasterNodeRequest 实现类的实例填充请求的参数和数据并将其发送给主节点。主节点根据不同的 MasterNodeRequest 实现类的类型执行相应的操作 /*** A based request for master based operation.* 在master上*/ public abstract class MasterNodeRequestRequest extends MasterNodeRequestRequest extends ActionRequest {public static final TimeValue DEFAULT_MASTER_NODE_TIMEOUT TimeValue.timeValueSeconds(30);protected TimeValue masterNodeTimeout DEFAULT_MASTER_NODE_TIMEOUT;protected MasterNodeRequest() {}protected MasterNodeRequest(StreamInput in) throws IOException {super(in);masterNodeTimeout in.readTimeValue();}Overridepublic void writeTo(StreamOutput out) throws IOException {super.writeTo(out);out.writeTimeValue(masterNodeTimeout);}/*** A timeout value in case the master has not been discovered yet or disconnected.*/SuppressWarnings(unchecked)public final Request masterNodeTimeout(TimeValue timeout) {this.masterNodeTimeout timeout;return (Request) this;}/*** A timeout value in case the master has not been discovered yet or disconnected.*/public final Request masterNodeTimeout(String timeout) {return masterNodeTimeout(TimeValue.parseTimeValue(timeout, null, getClass().getSimpleName() .masterNodeTimeout));}public final TimeValue masterNodeTimeout() {return this.masterNodeTimeout;} }这里有点模糊后面学到数据节点向主节点请求或者同步什么时我再挂个链接 2、Master节点处理来自客户端的请求(以创建索引请求举例) (1)首先会找到RestHandler中创建索引的Action(RestCreateIndexAction) 至于请求如何找到RestCreateIndexAction的可以参考Elasticsearch 8.9启动时构建接收Rest请求的hander过程源码 ServerlessScope(Scope.PUBLIC) public class RestCreateIndexAction extends BaseRestHandler {//省略代码 Overridepublic ListRoute routes() {return List.of(new Route(PUT, /{index}));}Overridepublic String getName() {return create_index_action;}Overridepublic RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {CreateIndexRequest createIndexRequest;if (request.getRestApiVersion() RestApiVersion.V_7) {createIndexRequest prepareRequestV7(request);} else {createIndexRequest prepareRequest(request);}return channel - client.admin().indices().create(createIndexRequest, new RestToXContentListener(channel));}//省略代码 } (2)再执行继承自TransportMasterNodeAction的Action必须实现的masterOperation方法 TransportMasterNodeAction 主要用于处理来自节点的各种管理操作请求如创建索引、删除索引、更新集群设置等。 当节点(数据节点)发送请求到主节点时请求会被传递给相应的 TransportMasterNodeAction 实现类进行处理。实现类会根据请求的类型执行相应的操作逻辑并返回执行结果给主节点。 /*** 需要在主节点上执行的操作的基类。* A base class for operations that needs to be performed on the master node.**/ public abstract class TransportMasterNodeActionRequest extends MasterNodeRequestRequest, Response extends ActionResponse extendsHandledTransportActionRequest, ResponseimplementsActionWithReservedStateRequest {//省略代码 }/*** 创建索引操作*/ public class TransportCreateIndexAction extends TransportMasterNodeActionCreateIndexRequest, CreateIndexResponse {Overrideprotected void masterOperation(Task task,final CreateIndexRequest request,final ClusterState state,final ActionListenerCreateIndexResponse listener) {//省略代码createIndexService.createIndex(updateRequest,listener.map(response - new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName)));} } 二、RestHander的Action如何映射到TransportMasterNodeAction(还是以创建索引举例) 这个场景为主节点和数据节点分离的情况 1、首先通过nodeClient执行doExecute() client.admin().indices().create中create方法调用IndicesAdmin类的create方法再调用execute方法的入参是 CreateIndexAction.INSTANCE static class IndicesAdmin implements IndicesAdminClient {Overridepublic void create(final CreateIndexRequest request, final ActionListenerCreateIndexResponse listener) {execute(CreateIndexAction.INSTANCE, request, listener);}Overridepublic Request extends ActionRequest, Response extends ActionResponse ActionFutureResponse execute(ActionTypeResponse action,Request request) {return client.execute(action, request);}} 调用的是AbstractClient的execute方法 /*** This is the single execution point of *all* clients.* 这是所有客户端的单个执行点。*/Overridepublic final Request extends ActionRequest, Response extends ActionResponse void execute(ActionTypeResponse action,Request request,ActionListenerResponse listener) {try {doExecute(action, request, listener);} catch (Exception e) {assert false : new AssertionError(e);listener.onFailure(e);}}doExecute方法调用的是NodeClient类的方法 Overridepublic Request extends ActionRequest, Response extends ActionResponse void doExecute(ActionTypeResponse action,Request request,ActionListenerResponse listener) {// Discard the task because the Client interface doesnt use it.try {executeLocally(action, request, listener);} catch (TaskCancelledException | IllegalArgumentException | IllegalStateException e) {listener.onFailure(e);}}/***在本地执行 {link ActionType}返回用于跟踪它的 {link Task}并链接 {link ActionListener}。如果在侦听响应时不需要访问任务则首选此方法。这是用于实现 {link 客户端} 接口的方法。*/public Request extends ActionRequest, Response extends ActionResponse Task executeLocally(ActionTypeResponse action,Request request,ActionListenerResponse listener) {//注册并执行任务return taskManager.registerAndExecute(transport,transportAction(action),request,localConnection,new SafelyWrappedActionListener(listener));} 之后调用TaskManager.java的方法 2、创建一个task任务异步执行TransportAction public Request extends ActionRequest, Response extends ActionResponse Task registerAndExecute(String type,TransportActionRequest, Response action,Request request,Transport.Connection localConnection,ActionListenerResponse taskListener) { //检查请求是否有父任务如果有则注册子连接。final Releasable unregisterChildNode;if (request.getParentTask().isSet()) {unregisterChildNode registerChildConnection(request.getParentTask().getId(), localConnection);} else {unregisterChildNode null;}//创建一个新的跟踪上下文try (var ignored threadPool.getThreadContext().newTraceContext()) {final Task task;//注册一个任务并捕获可能的取消任务异常。try {task register(type, action.actionName, request);} catch (TaskCancelledException e) {Releasables.close(unregisterChildNode);throw e;}//执行操作并在操作完成时调用相应的监听器。action.execute(task, request, new ActionListener() {Overridepublic void onResponse(Response response) {try {release();} finally {taskListener.onResponse(response);}}//根据操作的成功或失败情况取消子任务并释放资源。Overridepublic void onFailure(Exception e) {try {if (request.getParentTask().isSet()) {cancelChildLocal(request.getParentTask(), request.getRequestId(), e.toString());}release();} finally {taskListener.onFailure(e);}}Overridepublic String toString() {return this.getClass().getName() { taskListener }{ task };}private void release() {Releasables.close(unregisterChildNode, () - unregister(task));}});//返回任务对象。return task;}}下面是TransportAction.java类中的方法 /*** Use this method when the transport action should continue to run in the context of the current task* 当传输操作应继续在当前任务的上下文中运行时请使用此方法*/public final void execute(Task task, Request request, ActionListenerResponse listener) {final ActionRequestValidationException validationException;//对请求进行验证如果验证过程中出现异常则记录错误日志并通知监听器执行失败。try {validationException request.validate();} catch (Exception e) {assert false : new AssertionError(validating of request [ request ] threw exception, e);logger.warn(validating of request [ request ] threw exception, e);listener.onFailure(e);return;}if (validationException ! null) {listener.onFailure(validationException);return;}//检查是否存在任务且请求需要存储结果如果满足条件则创建一个TaskResultStoringActionListener实例用于在任务完成后将结果存储起来。if (task ! null request.getShouldStoreResult()) {listener new TaskResultStoringActionListener(taskManager, task, listener);}//创建一个请求过滤器链RequestFilterChain然后调用proceed方法将任务、动作名称、请求和监听器传递给过滤器链进行处理。RequestFilterChainRequest, Response requestFilterChain new RequestFilterChain(this, logger);requestFilterChain.proceed(task, actionName, request, listener);}Overridepublic void proceed(Task task, String actionName, Request request, ActionListenerResponse listener) {int i index.getAndIncrement();try {if (i this.action.filters.length) {this.action.filters[i].apply(task, actionName, request, listener, this);} else if (i this.action.filters.length) {//this.action.doExecute(task, request, listener); 中action对应的是TransportMasterNodeAction。this.action.doExecute(task, request, listener);} else {listener.onFailure(new IllegalStateException(proceed was called too many times));}} catch (Exception e) {logger.trace(Error during transport action execution., e);listener.onFailure(e);}}this.action.doExecute(task, request, listener); 中action对应的是TransportMasterNodeAction 3、TransportMasterNodeAction中doExecute会通过线程池调用子类实现的masterOperation方法 TransportMasterNodeAction继承HandledTransportAction 而HandledTransportAction继承自TransportAction public abstract class TransportMasterNodeActionRequest extends MasterNodeRequestRequest, Response extends ActionResponse extendsHandledTransportActionRequest, ResponseimplementsActionWithReservedStateRequest {Overrideprotected void doExecute(Task task, final Request request, ActionListenerResponse listener) {//省略代码new AsyncSingleAction(task, request, listener).doStart(state);} } protected void doStart(ClusterState clusterState) {threadPool.executor(executor).execute(ActionRunnable.wrap(delegate, l - executeMasterOperation(task, request, clusterState, l)));} private void executeMasterOperation(Task task, Request request, ClusterState state, ActionListenerResponse listener)throws Exception {//调用子类实现masterOperation(task, request, state, listener);} //子类实现 protected abstract void masterOperation(Task task, Request request, ClusterState state, ActionListenerResponse listener)throws Exception; 4、TransportCreateIndexAction的masterOperation实现会调用createIndexService接口创建索引 其中创建索引的action是TransportCreateIndexAction Overrideprotected void masterOperation(Task task,final CreateIndexRequest request,final ClusterState state,final ActionListenerCreateIndexResponse listener) {createIndexService.createIndex(updateRequest,listener.map(response - new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName))); }之后调用createIndexService.createIndex创建索引
http://www.dnsts.com.cn/news/108528.html

相关文章:

  • 电子 东莞网站建设网站的表格参数怎么做
  • 如何建设社区网站首页买一个域名多少钱
  • 免费做网站软件2003写文章的网站
  • 惠州市网站制作公司商务网站策划书
  • 域名注册的网站天空建筑网站
  • 河北建设厅网站开通账号兰溪市建设局网站
  • 论坛门户网站建设运营费用百度推广图片尺寸要求
  • 建设网站的模板专业网站建站费用
  • 网站建设制作费用怎么样做网站页面
  • 企业网站申请流程游戏开发工具
  • 南昌企业网站建设哪家好WordPress哔哩哔哩主题
  • 低成本做网站 白之家一个网站可以做多少弹窗广告
  • c 多语言网站怎么做好三网网站
  • 网站备案域名还是空间手游传奇新开服网站
  • 郑州做手机网站建设网站建设画册设计
  • 网站图标在哪里做修改上海外贸网站制作公司
  • 做网站的公司为什么人少了wordpress 分享 网站
  • 东莞哪家公司做网站好男女做暖暖的试看网站酥酥影视
  • 过期网站.wordpress 假用户插件
  • 做网站模板赚钱本地门户网站
  • 网站空间做邮箱全网关键词搜索排行
  • 百度云盘搜索引擎入口江门seo全网营销
  • 手机网站广告代码网站建设公司介绍ppt
  • 网站制作过程流程网站建设方案图
  • 如何做网站短链接微信网站多少钱
  • wordpress网站数据做个企业网站 优帮云
  • 北京建设网站制作机械做卖产品网站
  • 已被网站管理员设置拦截平面设计软件学哪个比较好
  • 免费咨询图片大全大图福州短视频seo方法
  • 柒零叁网站建设品牌公司网站设计