当前位置: 首页 > news >正文

网站开发的前景中小微企业查询平台

网站开发的前景,中小微企业查询平台,微信红包封面分销平台,如何做电商创业消息中间件的对比 消息中间件 ActiveMQ RabbitMQ RocketMQ kafka 开发语言 java erlang java scala 单击吞吐量 万级 万级 10万级 10万级 时效性 ms us ms ms 可用性 高(主从架构) 高(主从架构) 非常高(主从架构) 非常高(主从架构) 消息中间件: acti…消息中间件的对比 消息中间件 ActiveMQ RabbitMQ RocketMQ kafka 开发语言 java erlang java scala 单击吞吐量 万级 万级 10万级 10万级 时效性 ms us ms ms 可用性 高(主从架构) 高(主从架构) 非常高(主从架构) 非常高(主从架构) 消息中间件: activeMQ:javajms协议性能一般吞吐量低。rabbitMQ:erlang(amqp协议)性能好功能丰富吞吐量一般。rocketMQjava性能好吞吐量丰富功能丰富。Kafka: scala吞吐量最大功能单一大数据领域 RocketMQ 是阿里开源的分布式消息中间件跟其它中间件相比RocketMQ 的特点是纯JAVA实现是一套提供了消息生产存储消费全过程API的软件系统。 RocketMQ的作用数据收集、限流削峰、异步解耦 数据收集 分布式系统会产生海量级数据流如:业务日志、监控数据、用户行为等。针对这些数据流进行实时或批量采集汇总然后对这些数据流进行大数据分析这是当前互联网平台的必备技术。通过MQ完成此类数据收集是最好的选择。 限流削峰 MQ可以将系统的超量请求暂存其中以便系统后期可以慢慢进行处理从而避免了请求的丢失或系统被压垮。 异步解耦 上游系统对下游系统的调用若为同步调用则会大大降低系统的吞吐量与并发度且系统耦合度太高、而异步调用则会解决这些问题。所以两层之间若要实现由同步到异步的转化一般性做法就是在这两层间添加一个MQ层。 rocketmq.apache.org Broker:经纪人(经理人) Topic主题消息区分分类,虚拟结构 Queue消息队列 Apache RocketMQ 是一款低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 可为分布式应用系统提供异步解耦和削峰填谷的能力同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。 发送消息 发送同步消息 同步消息发送后会用一个返回值也就是MQ服务器接收到消息返回的一个确认这种方式非常安全但是性能就没那么高而在MQ集群中也是要等到所有的从机都复制了消息以后才会返回这种方式适合重要消息的场景 Test void rocketmqProducerTest() throws Exception { //创建生产者 DefaultMQProducer producer new DefaultMQProducer(ProducerGroupName); //连接namesrv producer.setNamesrvAddr(192.168.68.133:9876); //启动 producer.start(); //创建消息 Message message new Message(Topic,消息.getBytes()); SendResult send producer.send(message); System.out.println(发送状态send.getSendStatus()); //关闭生产者 producer.shutdown(); }Test void rocketmqConsumerTest() throws Exception { //创建消费者 DefaultMQPushConsumer consumer new DefaultMQPushConsumer(ConsumerTest); //连接namesrv consumer.setNamesrvAddr(192.168.68.133:9876); //订阅主题 *表示该主题的所有消息 consumer.subscribe(Topic,*);//设置监听器(一直,异步回调方式) consumer.registerMessageListener(new MessageListenerConcurrently() { //消费方法 Override public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) { //业务处理for (MessageExt messageExt : msgs) { System.out.println(消费了new String(messageExt.getBody())); } System.out.println(消费者上下文context); //CONSUME_SUCCESS成功 RECONSUME_LATER失败 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });//启动 consumer.start(); //挂起当前jvm System.in.read(); //关闭 //consumer.shutdown(); } 发送异步消息 异步消息常用在对响应时间敏感的业务场景发送端不能容忍长时间等待Broker的响应。发送完后会有一个异步消息通知 Test void aysncProducerTest() throws Exception { //创建生产者 DefaultMQProducer producer new DefaultMQProducer(ProducerGroupName); //连接namesrv producer.setNamesrvAddr(192.168.68.133:9876); //启动 producer.start(); //创建消息 Message message new Message(aysncTopic,异步消息.getBytes()); producer.send(message, new SendCallback() { Override public void onSuccess(SendResult sendResult) { System.out.println(发送成功sendResult); }Override public void onException(Throwable throwable) { System.err.println(发送失败throwable); } }); System.out.println(执行了); //关闭生产者 //producer.shutdown(); //挂起当前jvm System.in.read(); }Test void aysncConsumerTest() throws Exception { //创建消费者 DefaultMQPushConsumer consumer new DefaultMQPushConsumer(ConsumerTest); //连接namesrv consumer.setNamesrvAddr(192.168.68.133:9876); //订阅主题 *表示该主题的所有消息 consumer.subscribe(aysncTopic,*);//设置监听器(一直,异步回调方式) consumer.registerMessageListener(new MessageListenerConcurrently() { //消费方法 Override public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) { //业务处理for (MessageExt messageExt : msgs) { System.out.println(消费了new String(messageExt.getBody())); } System.out.println(消费者上下文context); //CONSUME_SUCCESS成功 RECONSUME_LATER失败 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });//启动 consumer.start(); //挂起当前jvm System.in.read(); //关闭 //consumer.shutdown(); } 发送单向消息 单向消息发送这种方式不关心发送结果的场景这种方式吞吐量大但存在消息丢失的风险。使用案例日志信息发送 Test void OnewayProducerTest()throws Exception{ //创建生产者 DefaultMQProducer producer new DefaultMQProducer(Oneway); //连接namesrv producer.setNamesrvAddr(192.168.68.133:9876); //启动 producer.start(); //创建消息 Message message new Message(Oneway,单向消息.getBytes()); //发送消息 producer.sendOneway(message); //关闭生产者 producer.shutdown(); }Test void OnewayConsumerTest() throws Exception { //创建消费者 DefaultMQPushConsumer consumer new DefaultMQPushConsumer(ConsumerTest); //连接namesrv consumer.setNamesrvAddr(192.168.68.133:9876); //订阅主题 *表示该主题的所有消息 consumer.subscribe(Oneway,*);//设置监听器(一直,异步回调方式) consumer.registerMessageListener(new MessageListenerConcurrently() { //消费方法 Override public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) { //业务处理for (MessageExt messageExt : msgs) { System.out.println(消费了new String(messageExt.getBody())); } System.out.println(消费者上下文context); //CONSUME_SUCCESS成功 RECONSUME_LATER失败 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //启动 consumer.start(); //挂起当前jvm System.in.read(); //关闭 //consumer.shutdown(); } 发送延时消息 发送延时消息顾名思义。场景比如淘宝商城下单后并未支付有30分钟未支付订单状态 Test void delayProducerTest()throws Exception{ //创建生产者 DefaultMQProducer producer new DefaultMQProducer(delayGroup); //连接namesrv producer.setNamesrvAddr(192.168.68.133:9876); //启动 producer.start(); //创建消息 Message message new Message(delay,延迟消息.getBytes()); //设置延时 根据官方延时等级 message.setDelayTimeLevel(2); //发送消息 producer.sendOneway(message); //关闭生产者 producer.shutdown(); }Test void delayConsumerTest() throws Exception { //创建消费者 DefaultMQPushConsumer consumer new DefaultMQPushConsumer(ConsumerTest); //连接namesrv consumer.setNamesrvAddr(192.168.68.133:9876); //订阅主题 *表示该主题的所有消息 consumer.subscribe(delay,*);//设置监听器(一直,异步回调方式) consumer.registerMessageListener(new MessageListenerConcurrently() { //消费方法 Override public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) { //业务处理for (MessageExt messageExt : msgs) { System.out.println(消费了new String(messageExt.getBody())); } System.out.println(消费者上下文context); //CONSUME_SUCCESS成功 RECONSUME_LATER失败 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //启动 consumer.start(); //挂起当前jvm System.in.read(); //关闭 //consumer.shutdown(); } 发送批量消息 批量消息可以一次性发送一组消息 Test void delayProducerTest()throws Exception{ //创建生产者 DefaultMQProducer producer new DefaultMQProducer(delayGroup); //连接namesrv producer.setNamesrvAddr(192.168.68.133:9876); //启动 producer.start(); //创建消息 //批量消息 ListMessage messages Arrays.asList( new Message(delay,批量消息1.getBytes()), new Message(delay,批量消息2.getBytes()), new Message(delay,批量消息3.getBytes()) ); //发送消息 producer.send(messages); //关闭生产者 producer.shutdown(); } 发送带标签消息 RocketMQ提供消息过滤功能可根据业务逻辑区分带有A标签的被A消费带有B标签的被B消费 Test void TagProducerTest()throws Exception{ //创建生产者 DefaultMQProducer producer new DefaultMQProducer(TagGroup); //连接namesrv producer.setNamesrvAddr(192.168.68.133:9876); //启动 producer.start(); //创建消息 //批量消息 Message message1 new Message(tagTopic, tagA, tag标签内容A.getBytes()); Message message2 new Message(tagTopic, tagB, tag标签内容B.getBytes()); //发送消息 producer.send(message1); producer.send(message2); //关闭生产者 producer.shutdown(); }Test void TagAConsumerTest() throws Exception { //创建消费者 DefaultMQPushConsumer consumer new DefaultMQPushConsumer(ConsumerTest); //连接namesrv consumer.setNamesrvAddr(192.168.68.133:9876); //订阅主题 *表示该主题的所有消息 consumer.subscribe(tagTopic,tagA);//设置监听器(一直,异步回调方式) consumer.registerMessageListener(new MessageListenerConcurrently() { //消费方法 Override public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) { //业务处理for (MessageExt messageExt : msgs) { System.out.println(消费了new String(messageExt.getBody())); } System.out.println(消费者上下文context); //CONSUME_SUCCESS成功 RECONSUME_LATER失败 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //启动 consumer.start(); //挂起当前jvm System.in.read(); //关闭 //consumer.shutdown(); }Test void TagBConsumerTest() throws Exception { //创建消费者 DefaultMQPushConsumer consumer new DefaultMQPushConsumer(ConsumerTest); //连接namesrv consumer.setNamesrvAddr(192.168.68.133:9876); //订阅主题 *表示该主题的所有消息 consumer.subscribe(tagTopic,tagB);//设置监听器(一直,异步回调方式) consumer.registerMessageListener(new MessageListenerConcurrently() { //消费方法 Override public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) { //业务处理for (MessageExt messageExt : msgs) { System.out.println(消费了new String(messageExt.getBody())); } System.out.println(消费者上下文context); //CONSUME_SUCCESS成功 RECONSUME_LATER失败 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //启动 consumer.start(); //挂起当前jvm System.in.read(); //关闭 //consumer.shutdown(); } 发送顺序消息 发送的消息要保证消息是一定有序的顺序消息发送到同一个队列 实体类Data AllArgsConstructor public class MessageM { private int userID; private String desc; }顺序消息发送到同一个队列private ListMessageM messageMs Arrays.asList( new MessageM(1,下单), new MessageM(1,付款), new MessageM(1,配送), new MessageM(2,下单), new MessageM(2,付款), new MessageM(2,配送) );Test void orderProducerTest() throws Exception { //创建生产者 DefaultMQProducer producer new DefaultMQProducer(ProducerGroupName); //连接namesrv producer.setNamesrvAddr(192.168.211.131:9876); //启动 producer.start();messageMs.forEach(messageM - { //创建消息 Message message new Message(orderMsg,messageM.toString().getBytes()); //发送顺序消息发送到同一个队列 try { //相同的userID去相同的队列 producer.send( message, new MessageQueueSelector() { Override public MessageQueue select(ListMessageQueue list, Message message, Object o) { //选择队列 int hashCode o.toString().hashCode(); int i hashCode % list.size(); return list.get(i); } }, messageM.getUserID()); } catch (MQClientException e) { throw new RuntimeException(e); } catch (RemotingException e) { throw new RuntimeException(e); } catch (MQBrokerException e) { throw new RuntimeException(e); } catch (InterruptedException e) { throw new RuntimeException(e); } }); //关闭生产者 producer.shutdown(); } Test void orderConsumerTest() throws Exception { //创建消费者 DefaultMQPushConsumer consumer new DefaultMQPushConsumer(ConsumerTest); //连接namesrv consumer.setNamesrvAddr(192.168.211.131:9876); //订阅主题 *表示该主题的所有消息 consumer.subscribe(orderMsg,*);//设置监听器(一直,异步回调方式) //MessageListenerConcurrently 并发模式多线程 //MessageListenerOrderly 顺序模式单线程 consumer.registerMessageListener(new MessageListenerOrderly() { //消费方法 Override public ConsumeOrderlyStatus consumeMessage(ListMessageExt list, ConsumeOrderlyContext consumeOrderlyContext) { System.out.println(当前线程IDThread.currentThread().getId()); return ConsumeOrderlyStatus.SUCCESS; } }); //启动 consumer.start(); //挂起当前jvm System.in.read(); //关闭 //consumer.shutdown(); }
http://www.dnsts.com.cn/news/240294.html

相关文章:

  • 百度站长平台安卓版最新发布地址跳转
  • 如何自己做自己的网站注册公司最少需要多少钱
  • 美容院门户网站开发网站 搜索引擎 提交
  • 怎样免费做网站推广建一个网页网站
  • 深圳网站制作公司怎么样免费咨询大夫
  • 网站建设一样注意什么表白视频制作
  • 番禺网站建设培训网络建设公司方案
  • 网站首页标题友情链接工具
  • 网站建设价格标准信息涿州网站建设有限公司
  • 云南网站建设首选公司太原小程序开发定制
  • 网站开发所需要的条件廊坊百度快照优化排名
  • 建设银行官方网站个人系统板块做公寓酒店跟网站合作有什么技巧
  • 洛阳网站建设哪家便宜公司可以备案几个网站
  • 公司网站下二级站点如何做成品在线网站免费入口
  • 做seo网站优化多少钱怎样设计一个logo
  • 云南集优科技网站中国纪检监察报电子报
  • 在大网站做网页广告需要多少钱十堰网站建设怎么样
  • 网站备案了有什么好处vue快速建站
  • 网站开发产品经理小刘网站建设
  • 网站建设综合实训总结与体会做梯子的企业网站
  • 回收手机的网站哪家好甜点网站建设的功能及意义
  • 冠县网站建设网站建设电话
  • 实验室网站建设百度做一个网站怎么做呢
  • 合肥网站建设工作室兰州市网站
  • 医疗 网站前置审批番禺网站建设技术
  • 做爰网站下载地址百姓网招聘最新招聘信息
  • 网站开发维护协议天津黑曼巴网站建设
  • 建设安全带官方网站淄博电商网站建设
  • 电商型网站是否是趋势如何设计一个企业网站
  • 查找网站空间商网络营销平台有哪些特点