当前位置: 首页 > news >正文

成都解放号网站建设广州网站建设信科网络

成都解放号网站建设,广州网站建设信科网络,wordpress申请网站,全球热门网站排名文章目录 背景本公司的设计业务调研资源调研架构设计定时任务设计httpclient连接池重试机制请求收敛时间切片的动态分配API调用大盘告警手动抓取接口线程池提速定义业务线程池拒绝策略 不丢弃任务的处理方案规避触发拒绝策略触发拒绝策略之后的处理 分布式锁平台接口限频处理 待… 文章目录 背景本公司的设计业务调研资源调研架构设计定时任务设计httpclient连接池重试机制请求收敛时间切片的动态分配API调用大盘告警手动抓取接口线程池提速定义业务线程池拒绝策略 不丢弃任务的处理方案规避触发拒绝策略触发拒绝策略之后的处理 分布式锁平台接口限频处理 待扩展点todo 背景 我们在抖音,快手视频等平台运营者很多账号, 各个平台也提供了后台服务供观察投流效果。 但是每个平台都需要登录很麻烦公司也想收集投流的数据整合一套投流的智能系统提高roi收益。所以最近经手了从快手视频号定时下载数据的工作。 另外曾供职某erp公司的平台数据部门所以这里总结下定时任务下载任务要怎么设计。 本公司的设计 业务调研 公司在抖音,快手,视频号,小红书,天猫,京东,拼多多,得物,百度平台投流这些平台在各自的平台提供了api。所以整体的模式是 step1 -- 请求平台的接口,拿到投流数据 step2 -- 落到mysql数据库 step3 -- 数仓读数据, 然后定期清理mysql数据(mysql只有500G 不清理的情况下很容易被业务数据灌满) 资源调研 公司客观限制 前期只能提供一台8c 16G 200G的服务器。资源明确不足时 才可以申请新资源业务没有上云, 缺少k8s的基础 架构设计 标准的springcloud alibaba架构, 在单实例无法应该业务的时候, 部署多台platform实例, 预留扩展空间 platform维护定时任务的具体实现 system 系统服务 auth 鉴权相关 job 定时任务 定时任务设计 定时触发 job依赖quartz完成定时任务的触发 对比项QuartzXXL-Job分布式调度❌ 原生不支持✅ 支持简单分布式调度UI 管理❌ 无官方 UI需集成✅ 提供 Web 控制台动态任务配置❌ 靠代码配置✅ 控制台配置失败重试❌ 需自定义✅ 内建支持定时精度✅ 毫秒级⚠️ 秒级为主易用性❌ 较复杂✅ 简单易用执行隔离✅ 自定义线程池⚠️ 需手动配置隔离默认不隔离社区支持✅ 社区较活跃⚠️ 活跃度一般 job服务通过feign调用platform服务 微服务内部调用不通过gateway, 在服务注册与发现中心查找ServiceNameConstants.PLATFORM_SERVICE服务列表,使用负载均衡器选择一个可用实例, 然后发起http请求 FeignClient(contextId remotePlatformService,value ServiceNameConstants.PLATFORM_SERVICE,fallbackFactory RemotePlatformFallbackFactory.class) public interface RemotePlatformService { }工厂,模版模式实现主流程 3.1 模版实现公共逻辑 3.1.1 将任务单元准备好 public interface DownloadTaskBaseService {/*** 下载任务的入口*/void downloadJob(SphJobEnum jobEnum, String requestId);/*** 手动下载任务* param reqVO*/void downloadByManual(SphManualReqVO reqVO); }Overridepublic void downloadJob(SphJobEnum jobEnum, String requestId) {// 0.0 这是长耗时任务, 加锁1小时try {// 1.0 查询可用的授权列表// 2.0 轮训各个授权的对应的广告主的增量广告计划loopDownloadJob()} finally {// 3.0 解锁统计耗时}}private void loopDownloadJob(ListSphOauth2AccessTokenDO validAuthList, String requestId, SphJobEnum jobEnum) {// 1.0 乱序广告主,让广告主的执行更公平// 2.0 按照配置对广告主分组,为线程池提供数据支撑}private void dispatchTask(ListSphAuthAdvertiserInfoDO advertiserList, SphOauth2AccessTokenDO tokenDO,String requestId, SphJobEnum jobEnum, AtomicInteger oauthCount, AtomicInteger oauthCurrentCounter,AtomicInteger taskTotalCounter, AtomicInteger taskCurrentCounter) {// 线程池加速下载, 编排任务 规避线程池拒绝ListCompletableFutureString featureList new ArrayList();for (SphAuthAdvertiserInfoDO advertiserInfoDO : advertiserList) {CompletableFutureString feature CompletableFuture.supplyAsync(() - {downloadJob(advertiserInfoDO, tokenDO,authProcess,advertiserProcess,requestId, jobEnum);return authProcess advertiserProcess 执行完毕;}, sphThreadPoolExecutor).exceptionally((ex - {return 执行失败: ex.getCause().getMessage();}));featureList.add(feature);}CompletableFuture.allOf(featureList.toArray(new CompletableFuture[0])).join();}3.1.2 子任务完成数据下载以及数据入库 public abstract class SphDownloadTaskBaseServiceImpl implements SphDownloadTaskBaseService {/*** 需要具体的子任务实现* 1. 查询查询结果* 2. 批量入库* param lastRequestTimeDO* param endTime* param advertiserInfoDO* param tokenDO*/protected abstract void downloadJob(SysDictData lastRequestTimeDO, LocalDateTime endTime, SphAuthAdvertiserInfoDO advertiserInfoDO, SphOauth2AccessTokenDO tokenDO, String authProgress, String advertiserProgress);/*** 需要具体的子任务实现* 查询的上次查询时间* return*/protected abstract SysDictData getLastRequestTime(SphAuthAdvertiserInfoDO advertiserInfoDO);/*** 子任务实现** 保存请求时间* param lastRequestTimeDO* param endTime* param advertiserInfoDO*/protected abstract void saveLastRequestTime(SysDictData lastRequestTimeDO, LocalDateTime endTime, SphAuthAdvertiserInfoDO advertiserInfoDO); }3.1.3 工厂模式调用各个子任务 platform服务对外job服务暴漏下载功能 /*** 查询服务* param jobEnum* return*/private SphDownloadTaskBaseService getTaskService(SphJobEnum jobEnum) {return SpringUtils.getBean(jobEnum.getJobName());}private void downloadJob(SphJobEnum jobEnum, String requestId) {switch (jobEnum) {case SPH_ADGROUP_DAY_REPORT:case SPH_ADGROUP_HOUR_REPORT:case SPH_REFRESH_TOKEN:case SPH_DOWNLOAD_ADGROUP:case SPH_DOWNLOAD_VIDEO:case SPH_DOWNLOAD_PIC:case SPH_DOWNLOAD_WECHAT_AUTHORIZATION:case SPH_DOWNLOAD_CAMPAIGN:case SPH_DOWNLOAD_VIDEO_DAY_REPORT:case SPH_DOWNLOAD_IMAGE_DAY_REPORT:getTaskService(jobEnum).downloadJob(jobEnum, requestId);break;default: {log.error(不支持该该类型的定时任务 ServiceName:{} 任务描述{}, jobEnum.getJobName(), jobEnum.getDesc());throw new ServiceException(不支持该该类型的定时任务);}}}httpclient连接池 从业务模式可以看出需要对平台发起大量的请求, 所以很有必要引入httpclient连接池来管理连接 引入httpclient依赖 dependencygroupIdorg.apache.httpcomponents/groupIdartifactIdhttpclient/artifactId/dependency维护工具类 - 请求工具类 HttpUtils 2.1 HttpUtils.getCallWithPojo 请求入参的GET请求 2.2 HttpUtils.postCallWithToken POST请求 public class HttpUtils {private static final Logger log LoggerFactory.getLogger(HttpUtils.class);public static RequestConfig requestConfig;private static CloseableHttpClient httpClient;private static PoolingHttpClientConnectionManager connMgr;private static IdleConnectionMonitorThread idleThread;static {HttpUtils.initClient();}/*** 向指定 URL 发送GET方法的请求** param url 发送请求的 URL* return 所代表远程资源的响应结果*/public static String sendGet(String url) {return sendGet(url, StringUtils.EMPTY);}/*** 向指定 URL 发送GET方法的请求** param url 发送请求的 URL* param param 请求参数请求参数应该是 name1value1name2value2 的形式。* return 所代表远程资源的响应结果*/public static String sendGet(String url, String param) {return sendGet(url, param, Constants.UTF8);}/*** 向指定 URL 发送GET方法的请求** param url 发送请求的 URL* param param 请求参数请求参数应该是 name1value1name2value2 的形式。* param contentType 编码类型* return 所代表远程资源的响应结果*/public static String sendGet(String url, String param, String contentType) {StringBuilder result new StringBuilder();BufferedReader in null;try {String urlNameString StringUtils.isNotBlank(param) ? url ? param : url;log.info(sendGet - {}, urlNameString);URL realUrl new URL(urlNameString);URLConnection connection realUrl.openConnection();connection.setRequestProperty(accept, */*);connection.setRequestProperty(connection, Keep-Alive);connection.setRequestProperty(user-agent, Mozilla/5.0 (Windows NT 10.0; Win64; x64));connection.connect();in new BufferedReader(new InputStreamReader(connection.getInputStream(), contentType));String line;while ((line in.readLine()) ! null) {result.append(line);}log.info(recv - {}, result);} catch (ConnectException e) {log.error(调用HttpUtils.sendGet ConnectException, url url ,param param, e);} catch (SocketTimeoutException e) {log.error(调用HttpUtils.sendGet SocketTimeoutException, url url ,param param, e);} catch (IOException e) {log.error(调用HttpUtils.sendGet IOException, url url ,param param, e);} catch (Exception e) {log.error(调用HttpsUtil.sendGet Exception, url url ,param param, e);} finally {try {if (in ! null) {in.close();}} catch (Exception ex) {log.error(调用in.close Exception, url url ,param param, ex);}}return result.toString();}/*** 向指定 URL 发送POST方法的请求** param url 发送请求的 URL* param param 请求参数请求参数应该是 name1value1name2value2 的形式。* return 所代表远程资源的响应结果*/public static String sendPost(String url, String param) {PrintWriter out null;BufferedReader in null;StringBuilder result new StringBuilder();try {log.info(sendPost - {}, url);URL realUrl new URL(url);URLConnection conn realUrl.openConnection();conn.setRequestProperty(accept, */*);conn.setRequestProperty(connection, Keep-Alive);conn.setRequestProperty(user-agent, Mozilla/5.0 (Windows NT 10.0; Win64; x64));conn.setRequestProperty(Accept-Charset, utf-8);conn.setRequestProperty(contentType, utf-8);conn.setDoOutput(true);conn.setDoInput(true);out new PrintWriter(conn.getOutputStream());out.print(param);out.flush();in new BufferedReader(new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8));String line;while ((line in.readLine()) ! null) {result.append(line);}log.info(recv - {}, result);} catch (ConnectException e) {log.error(调用HttpUtils.sendPost ConnectException, url url ,param param, e);} catch (SocketTimeoutException e) {log.error(调用HttpUtils.sendPost SocketTimeoutException, url url ,param param, e);} catch (IOException e) {log.error(调用HttpUtils.sendPost IOException, url url ,param param, e);} catch (Exception e) {log.error(调用HttpsUtil.sendPost Exception, url url ,param param, e);} finally {try {if (out ! null) {out.close();}if (in ! null) {in.close();}} catch (IOException ex) {log.error(调用in.close Exception, url url ,param param, ex);}}return result.toString();}public static String sendSSLPost(String url, String param) {StringBuilder result new StringBuilder();String urlNameString url ? param;try {log.info(sendSSLPost - {}, urlNameString);SSLContext sc SSLContext.getInstance(SSL);sc.init(null, new TrustManager[]{new TrustAnyTrustManager()}, new java.security.SecureRandom());URL console new URL(urlNameString);HttpsURLConnection conn (HttpsURLConnection) console.openConnection();conn.setRequestProperty(accept, */*);conn.setRequestProperty(connection, Keep-Alive);conn.setRequestProperty(user-agent, Mozilla/5.0 (Windows NT 10.0; Win64; x64));conn.setRequestProperty(Accept-Charset, utf-8);conn.setRequestProperty(contentType, utf-8);conn.setDoOutput(true);conn.setDoInput(true);conn.setSSLSocketFactory(sc.getSocketFactory());conn.setHostnameVerifier(new TrustAnyHostnameVerifier());conn.connect();InputStream is conn.getInputStream();BufferedReader br new BufferedReader(new InputStreamReader(is));String ret ;while ((ret br.readLine()) ! null) {if (ret ! null !ret.trim().equals()) {result.append(new String(ret.getBytes(StandardCharsets.ISO_8859_1), StandardCharsets.UTF_8));}}log.info(recv - {}, result);conn.disconnect();br.close();} catch (ConnectException e) {log.error(调用HttpUtils.sendSSLPost ConnectException, url url ,param param, e);} catch (SocketTimeoutException e) {log.error(调用HttpUtils.sendSSLPost SocketTimeoutException, url url ,param param, e);} catch (IOException e) {log.error(调用HttpUtils.sendSSLPost IOException, url url ,param param, e);} catch (Exception e) {log.error(调用HttpsUtil.sendSSLPost Exception, url url ,param param, e);}return result.toString();}private static class TrustAnyTrustManager implements X509TrustManager {Overridepublic void checkClientTrusted(X509Certificate[] chain, String authType) {}Overridepublic void checkServerTrusted(X509Certificate[] chain, String authType) {}Overridepublic X509Certificate[] getAcceptedIssuers() {return new X509Certificate[]{};}}private static class TrustAnyHostnameVerifier implements HostnameVerifier {Overridepublic boolean verify(String hostname, SSLSession session) {return true;}}/*** 获取httpClient** return*/public static CloseableHttpClient getHttpClient() {if (httpClient ! null) {return httpClient;} else {return HttpClients.createDefault();}}/*** 创建连接池管理器** return*/private static PoolingHttpClientConnectionManager createConnectionManager() {PoolingHttpClientConnectionManager connMgr new PoolingHttpClientConnectionManager();// 将最大连接数增加到connMgr.setMaxTotal(HttpConf.MAX_TOTAL_CONN);// 将每个路由基础的连接增加到connMgr.setDefaultMaxPerRoute(HttpConf.MAX_ROUTE_CONN);return connMgr;}/*** 根据当前配置创建HTTP请求配置参数。** return 返回HTTP请求配置。*/private static RequestConfig createRequestConfig() {Builder builder RequestConfig.custom();builder.setConnectionRequestTimeout(StringUtils.nvl(HttpConf.WAIT_TIMEOUT, 10000));builder.setConnectTimeout(StringUtils.nvl(HttpConf.CONNECT_TIMEOUT, 10000));builder.setSocketTimeout(StringUtils.nvl(HttpConf.SO_TIMEOUT, 10000));return builder.build();}/*** 创建默认的HTTPS客户端信任所有的证书。** return 返回HTTPS客户端如果创建失败返回HTTP客户端。*/private static CloseableHttpClient createHttpClient(HttpClientConnectionManager connMgr) {try {final SSLContext sslContext new SSLContextBuilder().loadTrustMaterial(null, new TrustStrategy() {Overridepublic boolean isTrusted(X509Certificate[] chain, String authType) throws CertificateException {// 信任所有return true;}}).build();final SSLConnectionSocketFactory sslsf new SSLConnectionSocketFactory(sslContext);// 重试机制HttpRequestRetryHandler retryHandler new DefaultHttpRequestRetryHandler(HttpConf.RETRY_COUNT, true);ConnectionKeepAliveStrategy connectionKeepAliveStrategy new ConnectionKeepAliveStrategy() {Overridepublic long getKeepAliveDuration(HttpResponse httpResponse, HttpContext httpContext) {return HttpConf.KEEP_ALIVE_TIMEOUT; // tomcat默认keepAliveTimeout为20s}};httpClient HttpClients.custom().setSSLSocketFactory(sslsf).setConnectionManager(connMgr).setDefaultRequestConfig(requestConfig).setRetryHandler(retryHandler).setKeepAliveStrategy(connectionKeepAliveStrategy).build();} catch (Exception e) {log.error(Create http client failed, e);httpClient HttpClients.createDefault();}return httpClient;}/*** 初始化 只需调用一次*/public synchronized static CloseableHttpClient initClient() {if (httpClient null) {connMgr createConnectionManager();requestConfig createRequestConfig();// 初始化httpClient连接池httpClient createHttpClient(connMgr);// 清理连接池idleThread new IdleConnectionMonitorThread(connMgr);idleThread.start();}return httpClient;}/*** 关闭HTTP客户端。** param*/public synchronized static void shutdown() {try {if (idleThread ! null) {idleThread.shutdown();idleThread null;}} catch (Exception e) {log.error(httpclient connection manager close, e);}try {if (httpClient ! null) {httpClient.close();httpClient null;}} catch (IOException e) {log.error(httpclient close, e);}}/*** 请求上游 GET提交** param uri* throws IOException*/public static String getCall(final String uri) throws Exception {return getCall(uri, null, Constants.UTF8);}/*** 请求上游 GET提交** param uri* param contentType* throws IOException*/public static String getCall(final String uri, String contentType) throws Exception {return getCall(uri, contentType, Constants.UTF8);}/*** 携带入参的GET请求* param baseUri 请求基础URL* param pojo 请求参数对象* param K 参数类型* return 响应字符串(JSON格式)* throws IOException 当HTTP请求或响应处理失败时抛出* throws JSONException 当响应不是有效JSON时抛出*/public static K String getCallWithPojo(final String baseUri, K pojo) throws IOException, JSONException {// 参数校验if (baseUri null || baseUri.trim().isEmpty()) {throw new IllegalArgumentException(Base URI cannot be null or empty);}// 构建URIUriComponentsBuilder builder UriComponentsBuilder.fromHttpUrl(baseUri);ListField fields getAllFields(pojo.getClass());// 缓存反射结果以提高性能for (Field field : fields) {field.setAccessible(true);try {Object value field.get(pojo);if (value ! null) {String paramName CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE,field.getName());// 处理不同类型参数if (value instanceof String){// URL编码普通参数值builder.queryParam(paramName, URLEncoder.encode(value.toString(), StandardCharsets.UTF_8.name()));} else {// 对JSON字符串进行URL编码String jsonValue JsonUtils.toJsonString(value);builder.queryParam(paramName, URLEncoder.encode(jsonValue, StandardCharsets.UTF_8.name()));}}} catch (IllegalAccessException | UnsupportedEncodingException e) {log.warn(Failed to process field {}: {}, field.getName(), e.getMessage());}}String finalUrl builder.build().toUriString();HttpGet httpGet new HttpGet(finalUrl);httpGet.setConfig(requestConfig);// 使用try-with-resources确保资源释放try (CloseableHttpResponse httpRsp getHttpClient().execute(httpGet)) {int statusCode httpRsp.getStatusLine().getStatusCode();// 只处理成功或特定错误状态if (statusCode HttpStatus.SC_OK || statusCode HttpStatus.SC_FORBIDDEN) {HttpEntity entity httpRsp.getEntity();try {String rspText EntityUtils.toString(entity, StandardCharsets.UTF_8);// 提取跟踪IDHeader traceIdHeader httpRsp.getFirstHeader(X-Tsa-Trace-Id);String traceId traceIdHeader ! null ? traceIdHeader.getValue() : null;// 构造响应JSONJSONObject responseJson new JSONObject(rspText);if (traceId ! null) {responseJson.putOpt(request_id, traceId);}return responseJson.toString();} finally {EntityUtils.consumeQuietly(entity);}} else {throw new IOException(HTTP request failed with status code: statusCode , URL: finalUrl);}} catch (JSONException e) {throw new JSONException(Invalid JSON response from: finalUrl, e);} catch (Exception e) {throw new IOException(HTTP request failed for URL: finalUrl, e);}}/*** 递归获取类所有字段(包括父类)* param type 目标类* return 字段列表*/private static ListField getAllFields(Class? type) {ListField fields new ArrayList();Class? currentClass type;// 使用循环替代递归提高性能while (currentClass ! null currentClass ! Object.class) {fields.addAll(Arrays.asList(currentClass.getDeclaredFields()));currentClass currentClass.getSuperclass();}return fields;}/*** 请求上游 GET提交** param uri* param contentType* param charsetName* throws IOException*/public static String getCall(final String uri, String contentType, String charsetName) throws Exception {final String url uri;final HttpGet httpGet new HttpGet(url);httpGet.setConfig(requestConfig);if (!StringUtils.isEmpty(contentType)) {httpGet.addHeader(Content-Type, contentType);}final CloseableHttpResponse httpRsp getHttpClient().execute(httpGet);try {if (httpRsp.getStatusLine().getStatusCode() HttpStatus.SC_OK|| httpRsp.getStatusLine().getStatusCode() HttpStatus.SC_FORBIDDEN) {final HttpEntity entity httpRsp.getEntity();final String rspText EntityUtils.toString(entity, charsetName);// 提取 X-Tsa-Trace-IdHeader traceIdHeader httpRsp.getFirstHeader(X-Tsa-Trace-Id);String traceId traceIdHeader ! null ? traceIdHeader.getValue() : null;EntityUtils.consume(entity);// 构造返回的 JSON包含响应体和 traceIdJSONObject responseJson new JSONObject(rspText); // 解析原始JSONresponseJson.putOpt(request_id, traceId); // 直接添加到顶层return responseJson.toString(); // 返回修改后的JSON} else {throw new IOException(HTTP StatusCode httpRsp.getStatusLine().getStatusCode());}} finally {try {httpRsp.close();} catch (Exception e) {log.error(关闭httpRsp异常, e);}}}/*** 请求上游 POST提交** param uri* param paramsMap* throws IOException*/public static String postCall(final String uri, MapString, Object paramsMap) throws Exception {return postCall(uri, null, paramsMap, Constants.UTF8);}/*** 请求上游 POST提交** param uri* param contentType* param paramsMap* throws IOException*/public static String postCall(final String uri, String contentType, MapString, Object paramsMap) throws Exception {return postCall(uri, contentType, paramsMap, Constants.UTF8);}/*** 请求上游 POST提交** param uri* param contentType* param paramsMap* param charsetName* throws IOException*/public static String postCall(final String uri, String contentType, MapString, Object paramsMap,String charsetName) throws Exception {final String url uri;final HttpPost httpPost new HttpPost(url);httpPost.setConfig(requestConfig);if (!StringUtils.isEmpty(contentType)) {httpPost.addHeader(Content-Type, contentType);}// 添加参数ListNameValuePair list new ArrayListNameValuePair();if (paramsMap ! null) {for (Map.EntryString, Object entry : paramsMap.entrySet()) {list.add(new BasicNameValuePair(entry.getKey(), (String) entry.getValue()));}}httpPost.setEntity(new UrlEncodedFormEntity(list, charsetName));final CloseableHttpResponse httpRsp getHttpClient().execute(httpPost);try {if (httpRsp.getStatusLine().getStatusCode() HttpStatus.SC_OK) {final HttpEntity entity httpRsp.getEntity();final String rspText EntityUtils.toString(entity, charsetName);EntityUtils.consume(entity);return rspText;} else {throw new IOException(HTTP StatusCode httpRsp.getStatusLine().getStatusCode());}} finally {try {httpRsp.close();} catch (Exception e) {log.error(关闭httpRsp异常, e);}}}/*** 请求上游 POST提交** param uri* param param* throws IOException*/public static String postCall(final String uri, String param) throws Exception {return postCall(uri, null, param, Constants.UTF8);}/*** 携带token的post请求** param uri* param param* param token* return* throws Exception*/public static String postCallWithToken(final String uri, String param, String token) throws Exception {return postCall(uri, null, param, Constants.UTF8, token);}/*** 请求上游 POST提交** param uri* param contentType* param param* throws IOException*/public static String postCall(final String uri, String contentType, String param) throws Exception {return postCall(uri, contentType, param, Constants.UTF8);}/*** 请求上游 POST提交** param uri* param contentType* param param* param charsetName* throws IOException*/public static String postCall(final String uri, String contentType, String param, String charsetName)throws Exception {return postCall(uri, contentType, param, charsetName, null);}/*** 请求上游 POST提交** param uri* param contentType* param param* param charsetName* throws IOException*/public static String postCall(final String uri, String contentType, String param, String charsetName, String token)throws Exception {final String url uri;final HttpPost httpPost new HttpPost(url);httpPost.setConfig(requestConfig);if (!StringUtils.isEmpty(contentType)) {httpPost.addHeader(Content-Type, contentType);} else {httpPost.addHeader(Content-Type, application/json);}// 设置tokenif (StrUtil.isNotEmpty(token)) {httpPost.addHeader(Access-Token, token);}// 添加参数StringEntity paramEntity new StringEntity(param, charsetName);httpPost.setEntity(paramEntity);final CloseableHttpResponse httpRsp getHttpClient().execute(httpPost);try {if (httpRsp.getStatusLine().getStatusCode() HttpStatus.SC_OK) {final HttpEntity entity httpRsp.getEntity();final String rspText EntityUtils.toString(entity, charsetName);EntityUtils.consume(entity);return rspText;} else {throw new IOException(HTTP StatusCode httpRsp.getStatusLine().getStatusCode());}} finally {try {httpRsp.close();} catch (Exception e) {log.error(关闭httpRsp异常, e);}}}/*** 判断HTTP异常是否为读取超时。** param e 异常对象。* return 如果是读取引起的异常而非连接则返回true否则返回false。*/public static boolean isReadTimeout(final Throwable e) {return (!isCausedBy(e, ConnectTimeoutException.class) isCausedBy(e, SocketTimeoutException.class));}/*** 检测异常e被触发的原因是不是因为异常cause。检测被封装的异常。** param e 捕获的异常。* param cause 异常触发原因。* return 如果异常e是由cause类异常触发则返回true否则返回false。*/public static boolean isCausedBy(final Throwable e, final Class? extends Throwable cause) {if (cause.isAssignableFrom(e.getClass())) {return true;} else {Throwable t e.getCause();while (t ! null t ! e) {if (cause.isAssignableFrom(t.getClass())) {return true;}t t.getCause();}return false;}} } 关闭httpclient连接池,在项目重启时释放资源 Component public class ShutdownManagerConfig {private static final Logger logger LoggerFactory.getLogger(ShutdownManagerConfig.class);PreDestroypublic void destroy(){// 关闭线程池HttpUtils.shutdown();logger.info(关闭http连接线程池);} }重试机制 http请求难免有些需要重试的, 这里引入spring-retry 解决 引入依赖 dependencygroupIdorg.springframework.retry/groupIdartifactIdspring-retry/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-aop/artifactId/dependency使用例子 2.1 value 指定要触发重试的异常, 不是所有场景都需要重试的 2.2 maxAttempts 指定重试的最大次数 2.3 backoff 重试次略 2.3.1 backoff.delay 首次延迟时间 2.3.2 backoff.multiplier 之后的每次重试延迟时间乘multiplier 2.3.3 backoff.random 延迟时间随机抖动避免多个客户端同时重试 Retryable(value {RetryException.class}, maxAttempts 5, backoff Backoff(delay 1000, multiplier 2, random true))/*** 广告计划数据实时* 触发限频,重试策略* 1. 最大重试次数包括第一次调用 5次* 2. 重试间隔(ms) 1000ms* 3. multiplier 延迟乘数下次延迟 当前延迟 * multiplier* 4. random 是否在延迟时间上添加随机抖动 true* param reqVO* param token* return*/OverrideRetryable(value {RetryException.class}, maxAttempts 5, backoff Backoff(delay 1000, multiplier 2, random true))OverrideRetryable(value {RetryException.class}, maxAttempts 3, backoff Backoff(delay 1000, multiplier 2, random true))public SphApBaseRespVOSphApiImageDailyReportsRespVO getImageDailyReports(SphApiDailyReportsReqVO req) {SphApBaseRespVOSphApiImageDailyReportsRespVO apiResp getApiResp(SphApiEnum.GET_DAILY_REPORTS, req, SphApiImageDailyReportsRespVO.class);apiResp.setData(JsonUtils.parseObject(JsonUtils.toJsonString(apiResp.getData()), SphApiImageDailyReportsRespVO.class));return apiResp;}请求收敛 将对平台接口的请求封装到一个函数中,这么做有什么好处呢? 统一入口降低复杂度 1.1 简化调用所有接口请求通过单一函数处理调用方无需关心SDK的具体实现只需关注业务参数。 1.2 减少重复代码避免在每个调用处重复初始化SDK、处理认证等逻辑。 如下图每次请求参数都统一设置一个随机数。集中管理请求逻辑 2.1 参数标准化统一处理参数校验、默认值、格式转换如时间戳、枚举值。 2.2 错误处理集中捕获网络异常、快手API错误码并转换为一致的错误格式例如抛出特定异常或返回统一错误对象。 2.3 日志与监控方便统一添加请求日志、性能监控如耗时统计和埋点。 3 扩展性优化 3.1 接口重试Retryable 可以用在这一层 (需要处理自调用导致Retryable失效的问题) 3.2 各个接口的统计记数, 错误分析 都可以在这一层通过切面很方便的完成 private K extends SphApiBaseReqVO, T SphApBaseRespVOT getApiResp(SphApiEnum apiEnum, K params, ClassT dataType) {String resp ;try {params.setNonce(UUID.randomUUID().toString());SphRateLimiterManager.getApiRateLimiter(apiEnum).acquire();resp HttpUtils.getCallWithPojo(apiEnum.getUrl(), params);return getFormatApiResp(apiEnum, resp, dataType, params);} catch (ServiceException | RetryException e) {saveErrorResponse(apiEnum, params, resp, );throw e;} catch (Exception e) {log.error(视频号api请求失败 {}接口 入参{} 异常信息:{}, apiEnum.getDesc(), params, e.getMessage());saveErrorResponse(apiEnum, params, resp, );throw new ServiceException(e.getMessage(), SPH_API_REQUEST_ERROR.getCode());}}时间切片的动态分配 很多接口会限制单次分页请求可以抓到数据, 这也是常见的解决深度分页问题一个方案了。 那这时候 就需要在发生这种情况时把时间参数切成更小的粒度然后再次请求。 下面是递归时间切片的demo private ListSphImageDayReportDO downloadDayReport(ListLocalDateTime[] requestTimeList, SphAuthAdvertiserInfoDO advertiserInfoDO,SphOauth2AccessTokenDO tokenDO,String authProgress, String advertiserProgress, DateIntervalEnum dateIntervalEnum) {ListSphImageDayReportDO detailsList new ArrayList();for (LocalDateTime[] timeRange : requestTimeList) {try {// 1.0 常规请求} catch (ServiceException e) {// 2.0 触发深度分页的场景if (!e.getMessage().contains(请降低查询数据范围)) {throw e;}// 2.1 递归切成更小的时间片detailsList.addAll(getRollbackDOList(timeRange, advertiserInfoDO, tokenDO,authProgress, advertiserProgress, dateIntervalEnum));}if (detailsList.size() 100) {// 插入本批次的数据}}return detailsList;}private ListSphImageDayReportDO getRollbackDOList(LocalDateTime[] timeRange, SphAuthAdvertiserInfoDO advertiserInfoDO,SphOauth2AccessTokenDO tokenDO,String authProgress, String advertiserProgress, DateIntervalEnum dateIntervalEnum) {DateIntervalEnum oldDateIntervalEnum dateIntervalEnum;// 1.0 换成更细粒度的切片骨子额dateIntervalEnum getRollbackDateIntervalEnum(dateIntervalEnum);// 2.0 避免死循环判断是否已经是最小粒度if (oldDateIntervalEnum.equals(dateIntervalEnum)) throw new ServiceException(已经进入了最小时间切片颗粒, 但是还是太多了);}// 新时间切片, 递归return downloadDayReport(LocalDateTimeUtils.getDateRangeList(timeRange[0], timeRange[1], dateIntervalEnum), advertiserInfoDO, tokenDO, authProgress, advertiserProgress, dateIntervalEnum);}protected DateIntervalEnum getRollbackDateIntervalEnum(DateIntervalEnum dateIntervalEnum) {// 分钟为最小维度if (DateIntervalEnum.MINUTE.equals(dateIntervalEnum)) {return DateIntervalEnum.MINUTE;}int rollbackLevel dateIntervalEnum.getInterval() - 1;return DateIntervalEnum.valueOf(rollbackLevel);}API调用大盘 这一项是非常非常重要但是没排期做的。 告警 这一项是非常非常重要但是没排期做的。 这个跟API大盘搭配起来才能不断地优化项目。 手动抓取接口 这个功能是非常常见的功能如果出现漏单或者单据状态与平台不一致的情况可以手动抓某个单据或者时间段 以达成 更新数据 用来核验平台接口响应我们入库数据,平台数据三方有什么异同的依据 重抓某个时段的数据 public String downloadByManual(SphManualReqVO reqVO) {// 1.0 获取任务枚举类SphJobEnum jobEnum SphJobEnum.of(reqVO.getJobName());// 2.0 加锁redisLock.tryLock(buildManualLockKey(jobEnum.getJobName()), 5, 1200, TimeUnit.SECONDS);try {// 3.0 验参validateManualParam(reqVO, jobEnum);// 4.0 执行手动任务getTaskService(jobEnum).downloadByManual(reqVO);} finally {// 5.0 释放锁redisLock.unlock(buildManualLockKey(jobEnum.getJobName()));}} 线程池提速 视频号有7300个广告主 每个广告主执行1分钟 那么任务轮训一遍需要5天 那这个效率自然是不能接受的, 这时候自然要引入线程池 定义业务线程池 从线程池的构造函数 可以看到 corePoolSize 核心线数,池子中会一直持有的线程数maximumPoolSize 最大临时线程数keepAliveTime 临时线程在指定空闲时间之后会被释放unit 空闲时间单位workQueue 阻塞队列, 为避免oom一定要设置成有界阻塞队列 threadFactory 线程工厂, 各个业务要有不同的名字可以通过该工厂实现 RejectedExecutionHandler 拒绝策略, 自定义拒绝策略 我这里只记录了日志,因为下面会展示通过编排任务 来规避触发拒绝策略 线程池优先使用核心线程池处理,核心线程池打满之后 丢入 拒绝策略 AbortPolicy(默认策略) 直接抛出RejectedExecutionException异常适用于需要明确知道任务被拒绝的场景 CallerRunsPolicy 由提交任务的线程(调用者线程)直接执行该任务适用于不希望丢失任务且可以接受任务执行变慢的场景 DiscardPolicy 直接静默丢弃被拒绝的任务不做任何处理适用于允许丢失任务的场景 DiscardOldestPolicy 丢弃队列中最老的任务(队列头部的任务)然后尝试重新提交当前任务适用于允许丢弃老任务保留新任务的场景 # 线程池的构造函数 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueueRunnable workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) /*** 创建视频号的线程池* return*/Bean(sphThreadPoolExecutor)public ThreadPoolExecutor createSphThreadPoolExecutor() {return new ThreadPoolExecutor(sphThreadPoolConfig.getCoreSize(),sphThreadPoolConfig.getMaxSize(),sphThreadPoolConfig.getKeepAliveSeconds(),TimeUnit.SECONDS,new LinkedBlockingQueue(sphThreadPoolConfig.getQueueCapacity()),createThreadFactory(sphThreadPoolConfig.getName()),createRejectedHandler(sphThreadPoolConfig.getName()));}/*** 创建拒绝策略** param threadName* return*/private RejectedExecutionHandler createRejectedHandler(String threadName) {return (r, executor) - {log.error(触发{}线程池的拒绝策略 线程池: {}, 活跃线程数: {}, 队列大小: {} 最大线程池数:{}, threadName,executor.toString(), executor.getActiveCount(), executor.getQueue().size(), executor.getMaximumPoolSize());throw new RejectedExecutionException(触发 threadName 线程池的拒绝策略);};}/*** 创建工厂** param name* return*/private ThreadFactory createThreadFactory(String name) {return new ThreadFactory() {private final AtomicInteger threadNumber new AtomicInteger(1);Overridepublic Thread newThread(Runnable r) {return new Thread(r, name threadNumber.getAndIncrement());}};}不丢弃任务的处理方案 规避触发拒绝策略 从线程池工作原理可以知道, 只要把统一时间的任务总数控制在最大核心线程数 阻塞队列长度之内就可以规避触发拒绝策略, 下面是个demo 给各个任务分配同一时间的最大线程数 视频号连接池配置如下, 最大线程数800, 队列长度800, 核心线程400 那么只要控制同一时刻丢入线程池的任务总数少于1600个, 那么就不会触发拒绝策略。 下面的枚举类分配了各个任务同一时刻丢入队列最大数, 累计1050个线程。满负荷运行时堆不满有界队列, 所以线程池只会有400个工作线程也不会触发拒绝。 platform-pool:sph:name: 视频号线程池coreSize: 400maxSize: 800queueCapacity: 800keepAliveSeconds: 60 Getter AllArgsConstructor public enum SphJobEnum {/*** 服务列表*/SPH_ADGROUP_DAY_REPORT(sphDownloadAdgroupDayReportService, 视频号 - 拉取广告报表数据(天级别), 100),SPH_ADGROUP_HOUR_REPORT(sphDownloadAdgroupHourlyReportService, 视频号 - 拉取广告报表数据(小时级别), 300),SPH_REFRESH_TOKEN(sphRefreshTokenService, 视频号 - 刷新token, 1),SPH_DOWNLOAD_ADGROUP(sphDownloadAdgroupService, 视频号 - 下载广告组, 50),SPH_DOWNLOAD_VIDEO(sphDownloadVideoService, 视频号 - 下载视频素材, 100),SPH_DOWNLOAD_PIC(sphDownloadPicService, 视频号 - 下载图片素材, 100),SPH_DOWNLOAD_WECHAT_AUTHORIZATION(sphDownloadWechatChannelsAuthorizationService, 视频号 - 下载获取授权记录列表, 100),SPH_DOWNLOAD_CAMPAIGN(sphDownloadCampaignsService, 视频号 - 下载广告计划(即将下架), 100),SPH_DOWNLOAD_VIDEO_DAY_REPORT(sphDownloadVideoDayReportService, 视频号 - 下载视频日报, 100),SPH_DOWNLOAD_IMAGE_DAY_REPORT(sphDownloadImageDayReportService, 视频号 - 下载图片日报, 100),;private final String jobName;private final String desc;/*** 使用线程池处理, 设置占用的线程数, 避免线程池耗尽* 线程池的设置是 400个核心线程, 800个队列长度, 600个最大线程数.因为采用的是阻塞等待执行完的方式,所以可以设置的最大长度是为 1200个,即填满队列** 300 300 400 50 1050线程, 现成核心线程数400个** */private final Integer dispatchThreadCount;public static SphJobEnum of(String serviceName) {for (SphJobEnum jobEnum : values()) {if (jobEnum.getJobName().equals(serviceName)) {return jobEnum;}}return null;} }编排任务阻塞执行 场景: 各个任务已经分配了线程资源数那么只需要 通过任务编排每次丢入各个任务的最大任务数 调用者线程阻塞等待任务完成 CompletableFuture.join再次丢入线程池自己的最大任务数, 循环直到任务全部完成 private void loopDownloadJob(ListSphOauth2AccessTokenDO validAuthList, String requestId, SphJobEnum jobEnum) {// 1.0 CollectionUtils.partition按照数量分批for (ListSphAuthAdvertiserInfoDO partList : CollectionUtils.partition(advertiserList, jobEnum.getDispatchThreadCount())) {dispatchTask(partList, tokenDO, requestId, jobEnum, oauthCount, oauthCurrentCounter, taskTotalCounter, taskCurrentCounter);}}private void dispatchTask(ListSphAuthAdvertiserInfoDO advertiserList, SphOauth2AccessTokenDO tokenDO,String requestId, SphJobEnum jobEnum, AtomicInteger oauthCount, AtomicInteger oauthCurrentCounter,AtomicInteger taskTotalCounter, AtomicInteger taskCurrentCounter) {ListCompletableFutureString featureList new ArrayList();for (SphAuthAdvertiserInfoDO advertiserInfoDO : advertiserList) {CompletableFutureString feature CompletableFuture.supplyAsync(() - {downloadJob(advertiserInfoDO, tokenDO,authProcess,advertiserProcess,requestId, jobEnum);return authProcess advertiserProcess 执行完毕;}, sphThreadPoolExecutor).exceptionally((ex - {return 执行失败: ex.getCause().getMessage();}));featureList.add(feature);}CompletableFuture.allOf(featureList.toArray(new CompletableFuture[0])).join();}触发拒绝策略之后的处理 自定义拒绝策略, 触发拒绝的任务序列化之后入库 自定义线程池继承ThreadPoolExecutor, 重写afterExecute方法(钩子方法 它会在线程池中的某个任务执行完成无论成功或异常后自动触发), 从数据库中读取数据反序列化之后放入有界队列中 demo 3.1 维护自定义拒绝策略,线程池 public class ReloadableThreadPoolExecutor extends ThreadPoolExecutor {public ReloadableThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit,BlockingQueueRunnable workQueue,TaskMapper taskMapper) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);this.taskMapper taskMapper;this.setRejectedExecutionHandler(new DatabaseRejectedHandler(taskMapper));}Overrideprotected void afterExecute(Runnable r, Throwable t) {super.afterExecute(r, t);// 从数据库读取持久化的数据, 并加入队列ListTaskEntity pendingTasks taskMapper.selectPendingTasks();pendingTasks.forEach(task - {Runnable runnable deserializeTask(task.getTaskData());if (super.getQueue().offer(runnable)) {taskMapper.deleteById(task.getId());}});}private static class DatabaseRejectedHandler implements RejectedExecutionHandler {private final TaskMapper taskMapper;public DatabaseRejectedHandler(TaskMapper taskMapper) {this.taskMapper taskMapper;}Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {if (!executor.isShutdown()) {TaskEntity task new TaskEntity();task.setTaskData(r.toString());taskMapper.insert(task);}}} }3.2 注册线程池 Configuration public class ThreadPoolConfig {Beanpublic ReloadableThreadPoolExecutor taskExecutor(TaskMapper taskMapper) {return new ReloadableThreadPoolExecutor(5, // 核心线程数10, // 最大线程数60, // 空闲线程存活时间TimeUnit.SECONDS,new LinkedBlockingQueue(5), // 故意设置小队列测试拒绝策略taskMapper);} }分布式锁 引入 redisson 规避任务重复执行, 否则对资源的需求是无上限的。为了进入这个问题引入了redisson; 用法如下 优雅关锁 长耗时任务在遇到发版的时候如果没有销毁分布式锁会导致无法进行下一波次的执行。这显然是不合理的下面引入DisposableBean接口解决 Slf4j Component public class DisposableBeanConfig implements DisposableBean {Autowiredprotected RedisLock redisLock;Overridepublic void destroy() throws Exception {ListString lockPrefixList Arrays.asList(ks_download_task_, sph_download_task_, ks_manual_download_task_, sph_manual_download_task_);for (String prefix : lockPrefixList) {redisLock.unlockByPrefix(prefix);log.info(jvm销毁, 释放 prefix 为前缀的分布式锁);}} }平台接口限频处理 开放平台会对api进行限频, 这里引入了guava, 使用com.google.common.util.concurrent.RateLimiter解决限频问题。下面是一个demo 定义各个接口的每分钟令牌数 Getter AllArgsConstructor public enum SphApiEnum {/*** api枚举* */GET_TOKEN(https://api.e.qq.com//, 获取access_token, https://developers.e.qq.com/v3.0/docs/api//token, 1000d),GET_ADVERTISER_LIST(https://api.e.qq.com/v3.0//get, 查询腾讯广告广告主信息, https://developers.e.qq.com/v3.0/docs/api//get, 1000d),GET_DAILY_REPORTS(https://api.e.qq.com/v3.0//get, 查询腾讯广告日报表数据, https://developers.e.qq.com/v3.0/docs/api//get, 1000d),GET_HOURLY_REPORTS(https://api.e.qq.com/v3.0//get, 查询腾讯广告小时报表数据, https://developers.e.qq.com/v3.0/docs/api//get#fdub1, 1000d),GET_AD_GROUPS(https://api.e.qq.com/v3.0//get, 获取广告组, https://developers.e.qq.com/v3.0/docs/api//get, 1000d),GET_VIDEOS(https://api.e.qq.com/v3.0//get, 获取视频文件, https://developers.e.qq.com/v3.0/docs/api//get, 1000d),GET_PIC(https://api.e.qq.com/v3.0//get, 获取图片信息, https://developers.e.qq.com/v3.0/docs/api//get, 1000d),GET_WECHAT_CHANNELS_AUTHORIZATION_LIST(https://api.e.qq.com/v3.0//get, 获取视频号授权记录列表, https://developers.e.qq.com/v3.0/docs/api//get, 1000d),GET_CAMPAIGNS(https://api.e.qq.com/v1.3//get, 获取推广计划即将下线, https://developers.e.qq.com/docs/api//campaigns/campaigns_get?version1.3_preview1#kew0v, 1000d),;private final String url;private final String desc;/*** 接口文档的url* */private final String docUrl;/*** 每分钟的限频次数, 其实视频号每个接口每分钟默认1000次调用, 每天的限频次数:1440000* */private final double qpm; }定义限频管理器 public class RateLimiterManager {/*** 存储限流器*/private static final ConcurrentMapString, RateLimiter API_RATE_LIMITER new ConcurrentHashMap();/*** 查询当前APi的RateLimiter* param apiEnum* return*/public static RateLimiter getApiRateLimiter(SphApiEnum apiEnum) {return API_RATE_LIMITER.computeIfAbsent(apiEnum.name().intern(), key - RateLimiter.create(apiEnum.getQpm()/60, 3, TimeUnit.SECONDS));} }使用demo private K, T SphApBaseRespVOT getApiResp(SphApiEnum apiEnum, K params, ClassT dataType) {String resp ;try {// 获取令牌RateLimiterManager.getApiRateLimiter(apiEnum).acquire();resp HttpUtils.getCallWithPojo(apiEnum.getUrl(), params);return getFormatApiResp(apiEnum, resp, dataType, params);} catch (ServiceException | RetryException e) {saveErrorResponse(apiEnum, params, resp, );throw e;} catch (Exception e) {log.error(视频号api请求失败 {}接口 入参{} 异常信息:{}, apiEnum.getDesc(), params, e.getMessage());saveErrorResponse(apiEnum, params, resp, );throw new ServiceException(e.getMessage(), SPH_API_REQUEST_ERROR.getCode());}}待扩展点 曾供职于某个erp公司 在任职期间主要负责跟电商平台进行订单,商品,库存物流,wms进行数据交付 那么现在基于之前的经验分析下 还可以做哪些升级 todo
http://www.dnsts.com.cn/news/257744.html

相关文章:

  • 学做网站论坛坑人吗网站开发创新点
  • 昆明设计网站建设php 数据录入网站
  • 微网站是什么时候创建的医疗wordpress
  • 棋牌网站开发需要多少钱wordpress触屏主题
  • 做网站后台需要写代码吗创艺装饰公司口碑如何
  • 济阳做网站多少钱惠州网站建设公司推荐乐云seo
  • 韩都衣舍的网站建设产品软文是什么意思
  • 北京网站排名优化公司注册推广赚钱一个80元
  • 网站模板 数据库三只松鼠的网站建设
  • 手机网站被做跳转不用js做网站
  • 系统官网网站模板做网站会提供源代码
  • 百度为什么不收录网站的某个版块怎么建设网站赚钱手机
  • 深圳手机商城网站设计制作手机制作3d动画
  • 营销网站建设步骤济南网站建设的费用
  • 随州网站seo多少钱天元建设集团有限公司财务部电话
  • 购物网站系统建设方案维力安网站建设公司
  • 郑州网站建设方案优化做药的常用网站
  • 有口碑的合肥网站建设wordpress 中英文网站
  • 低价网站制作企业_沈阳做网站
  • 做外贸要建什么网站旅游网站模块分类
  • 建设银行网银盾连接不上网站做网站要注意什么
  • 玉溪网站制作建设网站需要设备
  • 英语外贸网站建设上海工商网查询企业信息查询系统
  • 网站建设管理内容保障制度制作小程序营销平台
  • 肇庆市网站建设网站出问题
  • 学生管理系统 静态网站源码中文在线中文资源
  • phpcms网站音乐代码存放在什么位置用vs2010做的网站
  • 网站首页建设公司创意产品设计及介绍
  • 企业网站的常见服务天津市房地产官网
  • 网站配色风格有哪些女孩子学做网站有前途吗