当前位置: 首页 > news >正文

网站开发的前景wordpress code theme

网站开发的前景,wordpress code theme,北京的互联网企业,谷歌在线搜索消息中间件的对比 消息中间件 ActiveMQ RabbitMQ RocketMQ kafka 开发语言 java erlang java scala 单击吞吐量 万级 万级 10万级 10万级 时效性 ms us ms ms 可用性 高(主从架构) 高(主从架构) 非常高(主从架构) 非常高(主从架构) 消息中间件: acti…消息中间件的对比 消息中间件 ActiveMQ RabbitMQ RocketMQ kafka 开发语言 java erlang java scala 单击吞吐量 万级 万级 10万级 10万级 时效性 ms us ms ms 可用性 高(主从架构) 高(主从架构) 非常高(主从架构) 非常高(主从架构) 消息中间件: activeMQ:javajms协议性能一般吞吐量低。rabbitMQ:erlang(amqp协议)性能好功能丰富吞吐量一般。rocketMQjava性能好吞吐量丰富功能丰富。Kafka: scala吞吐量最大功能单一大数据领域 RocketMQ 是阿里开源的分布式消息中间件跟其它中间件相比RocketMQ 的特点是纯JAVA实现是一套提供了消息生产存储消费全过程API的软件系统。 RocketMQ的作用数据收集、限流削峰、异步解耦 数据收集 分布式系统会产生海量级数据流如:业务日志、监控数据、用户行为等。针对这些数据流进行实时或批量采集汇总然后对这些数据流进行大数据分析这是当前互联网平台的必备技术。通过MQ完成此类数据收集是最好的选择。 限流削峰 MQ可以将系统的超量请求暂存其中以便系统后期可以慢慢进行处理从而避免了请求的丢失或系统被压垮。 异步解耦 上游系统对下游系统的调用若为同步调用则会大大降低系统的吞吐量与并发度且系统耦合度太高、而异步调用则会解决这些问题。所以两层之间若要实现由同步到异步的转化一般性做法就是在这两层间添加一个MQ层。 rocketmq.apache.org Broker:经纪人(经理人) Topic主题消息区分分类,虚拟结构 Queue消息队列 Apache RocketMQ 是一款低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 可为分布式应用系统提供异步解耦和削峰填谷的能力同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。 发送消息 发送同步消息 同步消息发送后会用一个返回值也就是MQ服务器接收到消息返回的一个确认这种方式非常安全但是性能就没那么高而在MQ集群中也是要等到所有的从机都复制了消息以后才会返回这种方式适合重要消息的场景 Test void rocketmqProducerTest() throws Exception { //创建生产者 DefaultMQProducer producer new DefaultMQProducer(ProducerGroupName); //连接namesrv producer.setNamesrvAddr(192.168.68.133:9876); //启动 producer.start(); //创建消息 Message message new Message(Topic,消息.getBytes()); SendResult send producer.send(message); System.out.println(发送状态send.getSendStatus()); //关闭生产者 producer.shutdown(); }Test void rocketmqConsumerTest() throws Exception { //创建消费者 DefaultMQPushConsumer consumer new DefaultMQPushConsumer(ConsumerTest); //连接namesrv consumer.setNamesrvAddr(192.168.68.133:9876); //订阅主题 *表示该主题的所有消息 consumer.subscribe(Topic,*);//设置监听器(一直,异步回调方式) consumer.registerMessageListener(new MessageListenerConcurrently() { //消费方法 Override public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) { //业务处理for (MessageExt messageExt : msgs) { System.out.println(消费了new String(messageExt.getBody())); } System.out.println(消费者上下文context); //CONSUME_SUCCESS成功 RECONSUME_LATER失败 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });//启动 consumer.start(); //挂起当前jvm System.in.read(); //关闭 //consumer.shutdown(); } 发送异步消息 异步消息常用在对响应时间敏感的业务场景发送端不能容忍长时间等待Broker的响应。发送完后会有一个异步消息通知 Test void aysncProducerTest() throws Exception { //创建生产者 DefaultMQProducer producer new DefaultMQProducer(ProducerGroupName); //连接namesrv producer.setNamesrvAddr(192.168.68.133:9876); //启动 producer.start(); //创建消息 Message message new Message(aysncTopic,异步消息.getBytes()); producer.send(message, new SendCallback() { Override public void onSuccess(SendResult sendResult) { System.out.println(发送成功sendResult); }Override public void onException(Throwable throwable) { System.err.println(发送失败throwable); } }); System.out.println(执行了); //关闭生产者 //producer.shutdown(); //挂起当前jvm System.in.read(); }Test void aysncConsumerTest() throws Exception { //创建消费者 DefaultMQPushConsumer consumer new DefaultMQPushConsumer(ConsumerTest); //连接namesrv consumer.setNamesrvAddr(192.168.68.133:9876); //订阅主题 *表示该主题的所有消息 consumer.subscribe(aysncTopic,*);//设置监听器(一直,异步回调方式) consumer.registerMessageListener(new MessageListenerConcurrently() { //消费方法 Override public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) { //业务处理for (MessageExt messageExt : msgs) { System.out.println(消费了new String(messageExt.getBody())); } System.out.println(消费者上下文context); //CONSUME_SUCCESS成功 RECONSUME_LATER失败 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });//启动 consumer.start(); //挂起当前jvm System.in.read(); //关闭 //consumer.shutdown(); } 发送单向消息 单向消息发送这种方式不关心发送结果的场景这种方式吞吐量大但存在消息丢失的风险。使用案例日志信息发送 Test void OnewayProducerTest()throws Exception{ //创建生产者 DefaultMQProducer producer new DefaultMQProducer(Oneway); //连接namesrv producer.setNamesrvAddr(192.168.68.133:9876); //启动 producer.start(); //创建消息 Message message new Message(Oneway,单向消息.getBytes()); //发送消息 producer.sendOneway(message); //关闭生产者 producer.shutdown(); }Test void OnewayConsumerTest() throws Exception { //创建消费者 DefaultMQPushConsumer consumer new DefaultMQPushConsumer(ConsumerTest); //连接namesrv consumer.setNamesrvAddr(192.168.68.133:9876); //订阅主题 *表示该主题的所有消息 consumer.subscribe(Oneway,*);//设置监听器(一直,异步回调方式) consumer.registerMessageListener(new MessageListenerConcurrently() { //消费方法 Override public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) { //业务处理for (MessageExt messageExt : msgs) { System.out.println(消费了new String(messageExt.getBody())); } System.out.println(消费者上下文context); //CONSUME_SUCCESS成功 RECONSUME_LATER失败 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //启动 consumer.start(); //挂起当前jvm System.in.read(); //关闭 //consumer.shutdown(); } 发送延时消息 发送延时消息顾名思义。场景比如淘宝商城下单后并未支付有30分钟未支付订单状态 Test void delayProducerTest()throws Exception{ //创建生产者 DefaultMQProducer producer new DefaultMQProducer(delayGroup); //连接namesrv producer.setNamesrvAddr(192.168.68.133:9876); //启动 producer.start(); //创建消息 Message message new Message(delay,延迟消息.getBytes()); //设置延时 根据官方延时等级 message.setDelayTimeLevel(2); //发送消息 producer.sendOneway(message); //关闭生产者 producer.shutdown(); }Test void delayConsumerTest() throws Exception { //创建消费者 DefaultMQPushConsumer consumer new DefaultMQPushConsumer(ConsumerTest); //连接namesrv consumer.setNamesrvAddr(192.168.68.133:9876); //订阅主题 *表示该主题的所有消息 consumer.subscribe(delay,*);//设置监听器(一直,异步回调方式) consumer.registerMessageListener(new MessageListenerConcurrently() { //消费方法 Override public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) { //业务处理for (MessageExt messageExt : msgs) { System.out.println(消费了new String(messageExt.getBody())); } System.out.println(消费者上下文context); //CONSUME_SUCCESS成功 RECONSUME_LATER失败 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //启动 consumer.start(); //挂起当前jvm System.in.read(); //关闭 //consumer.shutdown(); } 发送批量消息 批量消息可以一次性发送一组消息 Test void delayProducerTest()throws Exception{ //创建生产者 DefaultMQProducer producer new DefaultMQProducer(delayGroup); //连接namesrv producer.setNamesrvAddr(192.168.68.133:9876); //启动 producer.start(); //创建消息 //批量消息 ListMessage messages Arrays.asList( new Message(delay,批量消息1.getBytes()), new Message(delay,批量消息2.getBytes()), new Message(delay,批量消息3.getBytes()) ); //发送消息 producer.send(messages); //关闭生产者 producer.shutdown(); } 发送带标签消息 RocketMQ提供消息过滤功能可根据业务逻辑区分带有A标签的被A消费带有B标签的被B消费 Test void TagProducerTest()throws Exception{ //创建生产者 DefaultMQProducer producer new DefaultMQProducer(TagGroup); //连接namesrv producer.setNamesrvAddr(192.168.68.133:9876); //启动 producer.start(); //创建消息 //批量消息 Message message1 new Message(tagTopic, tagA, tag标签内容A.getBytes()); Message message2 new Message(tagTopic, tagB, tag标签内容B.getBytes()); //发送消息 producer.send(message1); producer.send(message2); //关闭生产者 producer.shutdown(); }Test void TagAConsumerTest() throws Exception { //创建消费者 DefaultMQPushConsumer consumer new DefaultMQPushConsumer(ConsumerTest); //连接namesrv consumer.setNamesrvAddr(192.168.68.133:9876); //订阅主题 *表示该主题的所有消息 consumer.subscribe(tagTopic,tagA);//设置监听器(一直,异步回调方式) consumer.registerMessageListener(new MessageListenerConcurrently() { //消费方法 Override public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) { //业务处理for (MessageExt messageExt : msgs) { System.out.println(消费了new String(messageExt.getBody())); } System.out.println(消费者上下文context); //CONSUME_SUCCESS成功 RECONSUME_LATER失败 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //启动 consumer.start(); //挂起当前jvm System.in.read(); //关闭 //consumer.shutdown(); }Test void TagBConsumerTest() throws Exception { //创建消费者 DefaultMQPushConsumer consumer new DefaultMQPushConsumer(ConsumerTest); //连接namesrv consumer.setNamesrvAddr(192.168.68.133:9876); //订阅主题 *表示该主题的所有消息 consumer.subscribe(tagTopic,tagB);//设置监听器(一直,异步回调方式) consumer.registerMessageListener(new MessageListenerConcurrently() { //消费方法 Override public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) { //业务处理for (MessageExt messageExt : msgs) { System.out.println(消费了new String(messageExt.getBody())); } System.out.println(消费者上下文context); //CONSUME_SUCCESS成功 RECONSUME_LATER失败 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //启动 consumer.start(); //挂起当前jvm System.in.read(); //关闭 //consumer.shutdown(); } 发送顺序消息 发送的消息要保证消息是一定有序的顺序消息发送到同一个队列 实体类Data AllArgsConstructor public class MessageM { private int userID; private String desc; }顺序消息发送到同一个队列private ListMessageM messageMs Arrays.asList( new MessageM(1,下单), new MessageM(1,付款), new MessageM(1,配送), new MessageM(2,下单), new MessageM(2,付款), new MessageM(2,配送) );Test void orderProducerTest() throws Exception { //创建生产者 DefaultMQProducer producer new DefaultMQProducer(ProducerGroupName); //连接namesrv producer.setNamesrvAddr(192.168.211.131:9876); //启动 producer.start();messageMs.forEach(messageM - { //创建消息 Message message new Message(orderMsg,messageM.toString().getBytes()); //发送顺序消息发送到同一个队列 try { //相同的userID去相同的队列 producer.send( message, new MessageQueueSelector() { Override public MessageQueue select(ListMessageQueue list, Message message, Object o) { //选择队列 int hashCode o.toString().hashCode(); int i hashCode % list.size(); return list.get(i); } }, messageM.getUserID()); } catch (MQClientException e) { throw new RuntimeException(e); } catch (RemotingException e) { throw new RuntimeException(e); } catch (MQBrokerException e) { throw new RuntimeException(e); } catch (InterruptedException e) { throw new RuntimeException(e); } }); //关闭生产者 producer.shutdown(); } Test void orderConsumerTest() throws Exception { //创建消费者 DefaultMQPushConsumer consumer new DefaultMQPushConsumer(ConsumerTest); //连接namesrv consumer.setNamesrvAddr(192.168.211.131:9876); //订阅主题 *表示该主题的所有消息 consumer.subscribe(orderMsg,*);//设置监听器(一直,异步回调方式) //MessageListenerConcurrently 并发模式多线程 //MessageListenerOrderly 顺序模式单线程 consumer.registerMessageListener(new MessageListenerOrderly() { //消费方法 Override public ConsumeOrderlyStatus consumeMessage(ListMessageExt list, ConsumeOrderlyContext consumeOrderlyContext) { System.out.println(当前线程IDThread.currentThread().getId()); return ConsumeOrderlyStatus.SUCCESS; } }); //启动 consumer.start(); //挂起当前jvm System.in.read(); //关闭 //consumer.shutdown(); }
http://www.dnsts.com.cn/news/270765.html

相关文章:

  • 网站视频下载脚本浙江省建筑培训网
  • 做外贸都有哪些网站大连网页制作培训学校
  • 青岛网站关键词优化公司wordpress标签里面没文章
  • 网站建设销售怎样网页设计模板html代码班级主题
  • 徐州做网站多少钱网络公司哪家好
  • 网站建设备案策划书公司网站管理制度
  • 网站验收标准国家653工程
  • 怎么创建网站免费的wordpress 公司插件
  • 优秀企业网站设计wordpress好看分页
  • 大连网站运营百度竞价排名查询
  • 网站开发与设计课程设计荆门刚刚发布的
  • 企业网站有哪些企业常州抖音seo
  • 福永公司网站建设商业模式顶层设计案例
  • 网站 语言选择建行信用卡网站
  • 百度云空间能做网站吗百度关键词价格
  • 建德网站设计公司谷歌浏览器chrome官网
  • 网站备案注销查询池州网站优化
  • 网上服装商城网站代码深圳网站建设深icp备
  • 一起做网站可以一件代发吗河南省建设厅网站取消劳务资质
  • 新公司网站建设方案宁波市建设集团股份有限公司
  • 婚介所网站开发费用wordpress目录权限
  • wordpress 嵌入htmlsem优化推广
  • 网站开发 语言网站开发文档百度文库
  • 网站seo优化管理系统云南网站建设模块
  • 用ps如何做网站首页wordpress怎么汉化插件
  • 电商网站排名如何做百度搜索推广
  • 安庆做网站哪个公司好开网店的流程和费用
  • 建设银行积分商城网站红酒网站模板下载
  • 阜阳网站开发公司上海专业网站建设公司有哪些
  • 企业网站建设需要注意什么厦门企业制作网站