网站广告条幅怎么做动态的,个人做网络推广哪个网站好,制作网页需要什么专业,下列关于网站开发网页上传之前写的两篇关于基于 trace_id 的链路追踪的文章#xff1a;
基于trace_id的链路追踪#xff08;含Feign、Hystrix、线程池等场景#xff09;基于trace_id的链路追踪#xff08;ForkJoinPool场景#xff09;
一、引言
在之前的文章中#xff0c;我们讨论了基于 trace…之前写的两篇关于基于 trace_id 的链路追踪的文章
基于trace_id的链路追踪含Feign、Hystrix、线程池等场景基于trace_id的链路追踪ForkJoinPool场景
一、引言
在之前的文章中我们讨论了基于 trace_id 的链路追踪的常见场景。然而最近我意识到在微服务架构中我们还缺少对一个非常常见场景的探讨在网关中如何处理 trace_id尤其是在 Reactor 异步模式下的处理。因此我决定记录下这些思考和解决方案。
二、具体场景
在Spring Cloud Gateway网关中我们需要实现请求访问日志的打印功能以便更好地排查问题。具体的实现方式包括两个全局过滤器
TraceIdGlobalFilter实现 trace_id 全局拦截先执行。AccessLogGlobalFilter实现请求访问日志的打印后执行。
在正常情况下这两个过滤器可以打印请求的 request 日志和 response 日志并且日志中都包含相同的 trace_id。然而在开发调试过程中我发现了一种异常情况request 日志中总能打印出 trace_id而 response 日志中则有时能打印出 trace_id有时却不能。这导致了 request 日志和 response 日志无法关联的问题。
三、分析
1. 为什么 response 日志没有打印 trace_id 通过分析日志我发现打印 response 日志的线程与打印 request 日志的线程并不是同一个线程。基于此我们可以判断trace_id 没有传递到打印 response 日志的线程中。
2. 为什么 trace_id 没有传递到打印 response 日志的线程中 我们知道 Spring Cloud Gateway 是基于 WebFlux Reactor 异步模式实现的因此一个请求的 request 和 response 可能由不同的线程来执行。在 TraceIdGlobalFilter 中我们使用了 MDC来传递 trace_id。然而MDC 在普通的多线程环境中有效但在 Reactor 异步模式下并不起作用。这是因为 Reactor 异步模式需要通过另外一种方式来传递 trace_id。
四、解决方案
在 WebFlux Reactor 异步模式下我们需要使用 reactor.util.context.Context 来传递 trace_id。核心逻辑如下 透传 trace_id 通过 Mono.contextWrite(context) 往 context 中设置 trace_id。 取出 trace_id 通过 Flux.deferContextual(context) 从 context 中获取 trace_id。 具体实现代码示例如下
// 设置 trace_id
Mono.contextWrite(context - context.put(trace_id, traceId));// 获取 trace_id
Flux.deferContextual(context - {String traceId context.get(trace_id);// 可将 traceId 设置到MDC中供当前线程使用return Flux.just(traceId);
});
通过这种方式我们可以确保 trace_id 在整个请求处理链路中都能被正确传递和使用解决了 request 日志和 response 日志断联的问题。
五、具体代码
TraceIdGlobalFilter
/*** trace_id 全局拦截器*/
Slf4j
Component
public class TraceIdGlobalFilter implements GlobalFilter, Ordered {Overridepublic MonoVoid filter(ServerWebExchange exchange, GatewayFilterChain chain) {ServerHttpRequest request exchange.getRequest();String traceId request.getHeaders().getFirst(TraceConsts.TRACE_ID);// trace_idtraceId MdcUtil.attachTraceId(traceId);// 将traceId传递给下游微服务String finalTraceId traceId;ConsumerHttpHeaders headersConsumer httpHeaders - {httpHeaders.set(TraceConsts.TRACE_ID, finalTraceId);};ServerHttpRequest requestNew exchange.getRequest().mutate().headers(headersConsumer).build();return chain.filter(exchange.mutate().request(requestNew).build()).doFinally(s - {// 清除MDCMdcUtil.detachTraceId();});}Overridepublic int getOrder() {return -100;}}AccessLogGlobalFilter /*** 请求访问日志 全局拦截器*/
Slf4j
Component
public class AccessLogGlobalFilter implements GlobalFilter, Ordered {/*** gateway access log 日志开关* p* 特别注意高并发业务场景下可以关闭日志来提升性能*/Value(${com.gateway.access.log.enabled:true})private boolean logEnabled;private final HandlerStrategies handlerStrategies HandlerStrategies.withDefaults();Overridepublic MonoVoid filter(ServerWebExchange exchange, GatewayFilterChain chain) {StopWatch stopWatch new StopWatch();stopWatch.start();ServerHttpRequest httpRequest exchange.getRequest();// 日志开关直接进入下一个Filterif (!logEnabled) {return chain.filter(exchange).then(Mono.fromRunnable(() - {stopWatch.stop();// 为了方便排查问题还是打印一个简单的日志if (log.isDebugEnabled()) {log.debug(请求参数 [{}] [{}] query:{}, time: {} ms, httpRequest.getURI().getPath(), httpRequest.getMethod(), httpRequest.getURI().getRawQuery(), stopWatch.getTotalTimeMillis());}}));}// Request 处理ServerRequest request ServerRequest.create(exchange, handlerStrategies.messageReaders());// header 参数HttpHeaders httpHeaders request.headers().asHttpHeaders();// 是否为文件上传若是文件上传则不打印bodyboolean isFile null ! httpHeaders.getContentType() AccessLogUtil.isBinayBodyData(httpHeaders.getContentType().toString());// response 包装ServerHttpResponseDecorator responseDecorator responseDecoratorAndRecordLog(exchange, stopWatch);if (isFile) {// 打印请求日志this.reqLog(request, isFile, null);// 执行过滤器return chain.filter(exchange.mutate().request(request.exchange().getRequest()).response(responseDecorator).build())// 从最初的Mono本身解析一个值并将其放入上下文context中以便下游可以通过上下文context API访问它// webflux reactor 异步模式下通过 contextWrite 往context中设置trace_id.contextWrite(context - context.put(TraceConsts.TRACE_ID, MdcUtil.getTraceId()));}MonoString modifiedBody request.bodyToMono(String.class).defaultIfEmpty(CommonConsts.NULL).flatMap(body - {// 打印请求日志this.reqLog(request, isFile, body);return Mono.just(body);});// 通过 BodyInserter 插入 body(支持修改body), 避免 request body 只能获取一次// BodyInserters.fromPublisher 不支持文件上传所以不能用BodyInserterMonoString, ReactiveHttpOutputMessage bodyInserter BodyInserters.fromPublisher(modifiedBody, String.class);HttpHeaders headers new HttpHeaders();headers.putAll(exchange.getRequest().getHeaders());headers.remove(HttpHeaders.CONTENT_LENGTH);CachedBodyOutputMessage outputMessage new CachedBodyOutputMessage(exchange, headers);return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() - {// request 包装ServerHttpRequestDecorator requestDecorator requestDecorator(exchange, headers, outputMessage);// 执行过滤器return chain.filter(exchange.mutate().request(requestDecorator).response(responseDecorator).build())// 从最初的Mono本身解析一个值并将其放入上下文context中以便下游可以通过上下文context API访问它// webflux reactor 异步模式下通过 contextWrite 往context中设置trace_id.contextWrite(context - context.put(TraceConsts.TRACE_ID, MdcUtil.getTraceId()));}));}Overridepublic int getOrder() {return -90;}/*** 打印 request log*/private void reqLog(ServerRequest request, boolean isFile, String body) {// URL query 参数String queryString request.uri().getRawQuery();// header 参数HttpHeaders headers request.headers().asHttpHeaders();String headersParams headersToString(headers);if (isFile) {if (log.isInfoEnabled()) {log.info(请求参数 [{}] [{}] query:{}, headers:{}, request.uri().getPath(), request.methodName(), queryString, headersParams);}return;}// request body 长度处理避免太长打印耗性能String requestBody AccessLogUtil.fixFieldAndReplaceWhite(body, AccessLogUtil.DEF_MAX_LEN);if (log.isInfoEnabled()) {log.info(请求参数 [{}] [{}] query:{}, headers:{}, body:{}, request.uri().getPath(), request.methodName(), queryString, headersParams, requestBody);}}/*** 过滤headers避免打印过多的日志*/private String headersToString(HttpHeaders headers) {MapString, String map new HashMapString, String();for (Map.EntryString, ListString entry : headers.entrySet()) {if (RequestParamUtil.containsHeader(entry.getKey())) {map.put(entry.getKey(), entry.getValue().toString());}}return JSON.toJSONString(map);}/*** Request装饰器重新计算 headers*/private ServerHttpRequestDecorator requestDecorator(ServerWebExchange exchange, HttpHeaders headers,CachedBodyOutputMessage outputMessage) {return new ServerHttpRequestDecorator(exchange.getRequest()) {Overridepublic HttpHeaders getHeaders() {long contentLength headers.getContentLength();HttpHeaders httpHeaders new HttpHeaders();httpHeaders.putAll(super.getHeaders());if (contentLength 0) {httpHeaders.setContentLength(contentLength);} else {httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, chunked);}return httpHeaders;}Overridepublic FluxDataBuffer getBody() {return outputMessage.getBody();}};}/*** Response装饰器记录响应日志* p* 通过 DataBufferFactory 解决响应体分段传输问题。*/private ServerHttpResponseDecorator responseDecoratorAndRecordLog(ServerWebExchange exchange, StopWatch stopWatch) {ServerHttpResponse response exchange.getResponse();DataBufferFactory bufferFactory response.bufferFactory();return new ServerHttpResponseDecorator(response) {Overridepublic MonoVoid writeWith(Publisher? extends DataBuffer body) {stopWatch.stop();if (!(body instanceof Flux)) {return super.writeWith(body);}// 获取响应类型String responseContentType exchange.getAttribute(ServerWebExchangeUtils.ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);if (AccessLogUtil.isBinayBodyData(responseContentType)) {if (log.isInfoEnabled()) {log.info(响应参数: time {} ms, stopWatch.getTotalTimeMillis());}return super.writeWith(body);}// info及以上日志级别才做如下处理if (log.isInfoEnabled()) {Flux? extends DataBuffer fluxBody Flux.from(body).flatMap(dataBuffer - Flux.deferContextual(context - {// webflux reactor 异步模式下通过 deferContextual 取出context中的trace_idMdcUtil.putTraceId(context.get(TraceConsts.TRACE_ID));if (log.isDebugEnabled()) {log.debug(spring cloud gateway webflux reactor 异步模式下透传trace_id: {}, MdcUtil.getTraceId());}return Flux.just(dataBuffer);})).doFinally(signalType - {// 清理掉trace_idMdcUtil.removeTraceId();});return super.writeWith(fluxBody.buffer().map(dataBuffers - {// 合并多个流集合解决返回体分段传输DataBufferFactory dataBufferFactory new DefaultDataBufferFactory();DataBuffer join dataBufferFactory.join(dataBuffers);byte[] content new byte[join.readableByteCount()];join.read(content);// 释放掉内存DataBufferUtils.release(join);String responseBody new String(content, StandardCharsets.UTF_8);// response body 长度处理避免太长打印耗性能responseBody AccessLogUtil.fixFieldAndReplaceWhite(responseBody, AccessLogUtil.DEF_MAX_LEN);log.info(响应参数: {}, time {} ms, responseBody, stopWatch.getTotalTimeMillis());return bufferFactory.wrap(content);}));}return super.writeWith(body);}};}
}