当前位置: 首页 > news >正文

建站公司的服务内容手机网站建设服务器

建站公司的服务内容,手机网站建设服务器,模板网站怎么做才美观,圣都装饰的口碑怎么样队列这种数据结构都不陌生#xff0c;特点就是先进先出。有很多常用的消息中间件可以有现成的该部分功能#xff0c;这里使用zookeeper基于发布订阅模式来实现分布式队列。对应的会有一个生产者和一个消费者。 这里理论上还是使用顺序节点。生产者不断产生新的顺序子节点特点就是先进先出。有很多常用的消息中间件可以有现成的该部分功能这里使用zookeeper基于发布订阅模式来实现分布式队列。对应的会有一个生产者和一个消费者。 这里理论上还是使用顺序节点。生产者不断产生新的顺序子节点消费者watcher监听节点新增事件来消费消息。 生产者 CuratorFramework client ... client.start(); String path /testqueue; client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(path,11.getBytes()) 消费者 CuratorFramework client ... client.start(); String path /testqueue; PathChildrenCache pathCache new PathChildrenCache(client,path,true); pathCache.getListenable().addListener(new PathChildrenCacheListener() {Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {if(event.getType() PathChildrenCacheEvent.Type.CHILD_ADDED){ChildData data event.getData();//handle msgclient.delete().forPath(data.getPath());}} }); pathCache.start();使用curator queue 先来使用基本的队列类DistributedQueue。 DistributedQueue的初始化需要提交准备几个参数 client连接就不多说了: CuratorFramework client ...QueueSerializer这个主要是用来指定对消息data进行序列化和反序列化 这里就搞一个简单的字符串类型 QueueSerializerString serializer new QueueSerializerString() {Overridepublic byte[] serialize(String item) {return item.getBytes();}Overridepublic String deserialize(byte[] bytes) {return new String(bytes);} };QueueConsumer消息consumer当有新消息来的时候会调用consumer.consumeMessage()来处理消息 这里也搞个简单的string类型的处理consumer QueueConsumerString consumer new QueueConsumerString() {Overridepublic void consumeMessage(String s) throws Exception {System.out.println(receive msg:s);}Overridepublic void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {//TODO} };队列消息发布 //队列节点路径 String queuePath /queue; //使用上面准备的几个参数构造DistributedQueue对象 DistributedQueueString queue QueueBuilder.builder(client,consumer,serializer,queuePath).buildQueue(); queue.start(); //调用put方法生产消息 queue.put(hello); queue.put(msg); Thread.sleep(2000); queue.put(3);这样在启动测试程序在consumer的consumeMessage方法就会收到queue.put的消息。 这里有个问题有没有发现在初始化queue的时候需要指定consumer那岂不是只能同一个程序中生产消费何来的分布式 其实这里在queue对象创建的时候consumer可以为null这个时候queue就只生产消息。具体的逻辑需要看下DistributedQueue类的源码。 在DistributedQueue类的构造函数有一步设置isProducerOnly属性 isProducerOnly (consumer null);然后在start()方法会根据isProducerOnly来判断启动方式 if ( !isProducerOnly || (maxItems ! QueueBuilder.NOT_SET) ) {childrenCache.start(); }if ( !isProducerOnly ) {service.submit(new CallableObject(){Overridepublic Object call(){runLoop();return null;}}); }这里看到consumer为空两个if不成立不会初始化对那个的消息消费逻辑wather监听。只需要在另一个程序里创建queue启动时指定consumer即可。 源码分析 先从消息的发布也就是put方法 首先调用makeItemPath()获取创建节点路径 ZKPaths.makePath(queuePath, QUEUE_ITEM_NAME);这里QUEUE_ITEM_NAME“queue-”。 然后调用internalPut()方法来创建节点路径 //先累加消息数量putCount putCount.incrementAndGet(); //使用serializer序列化消息数据 byte[] bytes ItemSerializer.serialize(multiItem, serializer); //根据background来创建节点 if ( putInBackground ) {doPutInBackground(item, path, givenMultiItem, bytes); } else {doPutInForeground(item, path, givenMultiItem, bytes); }看doPutInForeground里就是具体的创建节点了 //创建节点 client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(path, bytes); //哦错了这里putCount不是总消息数是正在创建消息数创建完再回减 synchronized(putCount) {putCount.decrementAndGet();putCount.notifyAll(); }//如果有对应的lisener依次调用 putListenerContainer.forEach(listener - {if ( item ! null ){listener.putCompleted(item);}else{listener.putMultiCompleted(givenMultiItem);} });消息的发布就完成了。 然后是消息的consumer这里肯定是使用的watcher。这里还是回到前面start方法处根据isProducerOnly属性判断有两步操作 1、childrenCache.start(); childrenCache初始化是在queue的构造函数里 childrenCache new ChildrenCache(client, queuePath)其start方法会调用 private final CuratorWatcher watcher new CuratorWatcher() {Overridepublic void process(WatchedEvent event) throws Exception{if ( !isClosed.get() ){sync(true);}} };private final BackgroundCallback callback new BackgroundCallback(){Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception{if ( event.getResultCode() KeeperException.Code.OK.intValue() ){setNewChildren(event.getChildren());}}};void start() throws Exception{sync(true);}private synchronized void sync(boolean watched) throws Exception{if ( watched ){//走这里client.getChildren().usingWatcher(watcher).inBackground(callback).forPath(path);}else{client.getChildren().inBackground(callback).forPath(path);}} 这里先把代码都贴上看到内部定义了一个watcher和callback。这里inBackground就是watcher到事件使用callback进行处理最后是调用到setNewChildren方法 private synchronized void setNewChildren(ListString newChildren) {if ( newChildren ! null ){Data currentData children.get();//将数据设置到children变量里消息版本1children.set(new Data(newChildren, currentData.version 1));//notifyAll() 等待线程获取消息notifyFromCallback();} }这里有引入了一个children变量然后将数据设置到了该变量里。 private final AtomicReferenceData children new AtomicReferenceData(new Data(Lists.StringnewArrayList(), 0));children其实是线程间通信一个共享数据容器变量。这里设置了数据然后具体的数据消费在下一步。 2、线程池里丢了个任务去执行runLoop();方法。 回到DistributedQueue.start的第二步执行runLoop()方法看名字就应该知道了一直轮询获取消息。 还是来看代码吧 private void runLoop() {long currentVersion -1;long maxWaitMs -1;//while一直轮询while ( state.get() State.STARTED ){try{//从childrenCache里获取数据ChildrenCache.Data data (maxWaitMs 0) ? childrenCache.blockingNextGetData(currentVersion, maxWaitMs, TimeUnit.MILLISECONDS) : childrenCache.blockingNextGetData(currentVersion);currentVersion data.version;ListString children Lists.newArrayList(data.children);sortChildren(children); // makes sure items are processed in the correct orderif ( children.size() 0 ){maxWaitMs getDelay(children.get(0));if ( maxWaitMs 0 ){continue;}}else{continue;}/**处理数据 这里取出消息后会删除节点然后使用serializer反序列化节点数据调用consumer.consumeMessage来处理消息**/processChildren(children, currentVersion);}}} }这里获取数据使用了childrenCache.blockingNextGetData synchronized Data blockingNextGetData(long startVersion, long maxWait, TimeUnit unit) throws InterruptedException {long startMs System.currentTimeMillis();boolean hasMaxWait (unit ! null);long maxWaitMs hasMaxWait ? unit.toMillis(maxWait) : -1;//数据版本没变一直wait等待while ( startVersion children.get().version ){if ( hasMaxWait ){long elapsedMs System.currentTimeMillis() - startMs;long thisWaitMs maxWaitMs - elapsedMs;if ( thisWaitMs 0 ){break;}wait(thisWaitMs);}else{wait();}}return children.get(); }这里就有wait阻塞等消息当消息来时候会被唤醒。 其它类型队列 curator对优先队列(DistributedPriorityQueue)、延迟队列(DistributedDelayQueue)都有对应的实现有兴趣的自己看吧。
http://www.dnsts.com.cn/news/166772.html

相关文章:

  • 专业网站建设常州北京营销型网站案例
  • app网站开发学习主机公园安装wordpress要多久
  • 唐山网站建设唐山做网站建立自己的网站软件有
  • 游戏网站建设网wordpress 3.2
  • 创建网站的目的泰安房产查询系统
  • 交流网站模版新乐市建设银行网站
  • 做新网站怎样提交360广州公司注册无地址
  • 2017国外优秀网站模版代理公司注册网站
  • 网站建设与网页设计试卷上海哪家做网站好
  • 备案网站名称攻略网络设计师干什么的
  • 线上教学网站宁波城乡住房建设厅网站
  • 国内最大ae模板下载网站广州网站设计价格
  • 西安网站建设发布古建设计素材网站
  • 没有网站怎么推广有没有一个网站做黄油视频
  • 那个网站专利分析做的好百度上怎么发布作品
  • 建网站需要学什么门户网站首页模板下载
  • 厦门网站定制深圳教育集团网站建设
  • 178网站建设网站网站开发公司
  • 杭州网站设计推荐柚米美术馆网站的建设流程
  • 艺术学校网站模板wordpress 超简洁主题
  • 电商网站 建设步骤如何做旅游网站推广
  • 电商网站设计推荐亿企邦洞泾做网站
  • 珠海自助建站软件网站开发的技术路线
  • 成品网站分享一下百度收录不了网站
  • 初期网站开发费会计分录深圳互联网推广
  • 杭州网站建设q479185700棒北京建设网站的公司哪家好
  • 计算机(网站建设与维护)一般网站维护需要做什么
  • 别人发我网站外链会降权我吗网站建设如何选择
  • 国外自助建站系统建筑建设网站
  • 论述题亿唐网不做网站做品牌设计店名logo