网站建设主管的策划案,怎样将qq空间建设为个人网站,十大暗网搜索引擎,网站建设推广服务费的税率是文章目录一、什么是响应式编程1、Java的流和响应式流2、Java中响应式的使用3、Reactor中响应式流的基本接口4、Reactor中响应式接口的基本使用二、初始Reactor1、Flux和Mono的基本介绍2、引入Reactor依赖3、响应式类型的创建4、响应式类型的组合#xff08;1#xff09;使用m…
文章目录一、什么是响应式编程1、Java的流和响应式流2、Java中响应式的使用3、Reactor中响应式流的基本接口4、Reactor中响应式接口的基本使用二、初始Reactor1、Flux和Mono的基本介绍2、引入Reactor依赖3、响应式类型的创建4、响应式类型的组合1使用mergeWith合并响应式流2使用zip压缩合并响应式流3使用zip压缩合并为自定义对象的响应式流4选择第⼀个反应式类型进⾏发布5、转换和过滤反应式流1skip操作跳过指定数⽬的消息2skip()操作的另⼀种形式3take操作只发布第⼀批指定数量的数据项4take操作的另一种形式5filter操作自定义过滤条件6distinct操作去重7map操作映射新元素8flatMap将流转成新的流9buffer操作现将数据流拆分为小块10collectList操作也可以将所有数据收集到一个List11collectMap 操作产生⼀个发布Map的Mono6、在反应式类型上执行逻辑操作1⽤all()⽅法来确保Flux中的所有消息都满⾜某些条件2⽤any()⽅法来确保Flux中⾄少有⼀个消息满⾜某些条件7、在反应式类型上使用Subscriber订阅1使用Subscriber消费消息2使用Flux的doOnNext处理数据8、使用then来处理完成数据返回写在后面一、什么是响应式编程
响应式编程是一种面向数据流和变化传播的编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流而相关的计算模型会自动将变化的值通过数据流进行传播。
在开发应⽤程序代码时我们可以编写两种⻛格的代码即命令式和响应式。
命令式Imperative的代码它由⼀组任务组成每次只运⾏⼀项任务每项任务⼜都依赖于前⾯的任务。数据会按批次进⾏处理在前⼀项任务还没有完成对当前数据批次的处理时不能将这些数据递交给下⼀项处理任务。
响应式Reactive的代码它定义了⼀组⽤来处理数据的任务但是这些任务可以并⾏地执⾏。每项任务处理数据的⼀部分⼦集并将结果交给处理流程中的下⼀项任务同时继续处理数据的另⼀部分⼦集。
Reactor 是⼀个响应式编程库同时也是Spring家族的⼀部分。它是Spring 5反应式编程功能的基础。
1、Java的流和响应式流
Java的Stream流通常都是同步的并且只能处理有限的数据集。从本质上来说它们只是使⽤函数来对集合进⾏迭代的⼀种⽅式。
响应式流⽀持异步处理任意⼤⼩的数据集同样也包括⽆限数据集。只要数据就绪它们就能实时地处理数据并且能够通过回压来避免压垮数据的消费者。
2、Java中响应式的使用
JDK1.8时是基于Observer/Observable接口而实现的观察者模式
ObserverDemo observer new ObserverDemo();
// 添加观察者
observer.addObserver(new Observer() {Overridepublic void update(Observable o, Object arg) {System.out.println(发生了变化);}
});
observer.addObserver(new Observer() {Overridepublic void update(Observable o, Object arg) {System.out.println(收到了通知);}
});
observer.setChanged(); // 数据变化
observer.notifyObservers(); // 通知JDK9及以后Observer/Observable接口就被弃用了取而代之的是Flow类
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;public class FlowDemo {public static void main(String[] args) throws Exception {// 1. 定义发布者, 发布的数据类型是 Integer// 直接使用jdk自带的SubmissionPublisher, 它实现了 Publisher 接口SubmissionPublisherInteger publiser new SubmissionPublisherInteger();// 2. 定义订阅者SubscriberInteger subscriber new SubscriberInteger() {private Subscription subscription;Overridepublic void onSubscribe(Subscription subscription) {// 保存订阅关系, 需要用它来给发布者响应this.subscription subscription;// 请求一个数据this.subscription.request(1);}Overridepublic void onNext(Integer item) {// 接受到一个数据, 处理System.out.println(接受到数据: item);try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}// 处理完调用request再请求一个数据this.subscription.request(1);// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了// this.subscription.cancel();}Overridepublic void onError(Throwable throwable) {// 出现了异常(例如处理数据的时候产生了异常)throwable.printStackTrace();// 我们可以告诉发布者, 后面不接受数据了this.subscription.cancel();}Overridepublic void onComplete() {// 全部数据处理完了(发布者关闭了)System.out.println(处理完了!);}};// 3. 发布者和订阅者 建立订阅关系publiser.subscribe(subscriber);// 4. 生产数据, 并发布// 这里忽略数据生产过程for (int i 0; i 1000; i) {System.out.println(生成数据: i);// submit是个block方法publiser.submit(i);}// 5. 结束后 关闭发布者// 正式环境 应该放 finally 或者使用 try-resouce 确保关闭publiser.close();// 主线程延迟停止, 否则数据没有消费就退出Thread.currentThread().join(1000);// debug的时候, 下面这行需要有断点// 否则主线程结束无法debugSystem.out.println();}}import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;/**
* 带 process 的 flow demo
*//**
* Processor, 需要继承SubmissionPublisher并实现Processor接口
*
* 输入源数据 integer, 过滤掉小于0的, 然后转换成字符串发布出去
*/
class MyProcessor extends SubmissionPublisherStringimplements ProcessorInteger, String {private Subscription subscription;Overridepublic void onSubscribe(Subscription subscription) {// 保存订阅关系, 需要用它来给发布者响应this.subscription subscription;// 请求一个数据this.subscription.request(1);}Overridepublic void onNext(Integer item) {// 接受到一个数据, 处理System.out.println(处理器接受到数据: item);// 过滤掉小于0的, 然后发布出去if (item 0) {this.submit(转换后的数据: item);}// 处理完调用request再请求一个数据this.subscription.request(1);// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了// this.subscription.cancel();}Overridepublic void onError(Throwable throwable) {// 出现了异常(例如处理数据的时候产生了异常)throwable.printStackTrace();// 我们可以告诉发布者, 后面不接受数据了this.subscription.cancel();}Overridepublic void onComplete() {// 全部数据处理完了(发布者关闭了)System.out.println(处理器处理完了!);// 关闭发布者this.close();}
}public class FlowDemo2 {public static void main(String[] args) throws Exception {// 1. 定义发布者, 发布的数据类型是 Integer// 直接使用jdk自带的SubmissionPublisherSubmissionPublisherInteger publiser new SubmissionPublisherInteger();// 2. 定义处理器, 对数据进行过滤, 并转换为String类型MyProcessor processor new MyProcessor();// 3. 发布者 和 处理器 建立订阅关系publiser.subscribe(processor);// 4. 定义最终订阅者, 消费 String 类型数据SubscriberString subscriber new SubscriberString() {private Subscription subscription;Overridepublic void onSubscribe(Subscription subscription) {// 保存订阅关系, 需要用它来给发布者响应this.subscription subscription;// 请求一个数据this.subscription.request(1);}Overridepublic void onNext(String item) {// 接受到一个数据, 处理System.out.println(接受到数据: item);// 处理完调用request再请求一个数据this.subscription.request(1);// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了// this.subscription.cancel();}Overridepublic void onError(Throwable throwable) {// 出现了异常(例如处理数据的时候产生了异常)throwable.printStackTrace();// 我们可以告诉发布者, 后面不接受数据了this.subscription.cancel();}Overridepublic void onComplete() {// 全部数据处理完了(发布者关闭了)System.out.println(处理完了!);}};// 5. 处理器 和 最终订阅者 建立订阅关系processor.subscribe(subscriber);// 6. 生产数据, 并发布// 这里忽略数据生产过程publiser.submit(-111);publiser.submit(111);// 7. 结束后 关闭发布者// 正式环境 应该放 finally 或者使用 try-resouce 确保关闭publiser.close();// 主线程延迟停止, 否则数据没有消费就退出Thread.currentThread().join(1000);}}3、Reactor中响应式流的基本接口
响应式流规范可以总结为4个接⼝Publisher、Subscriber、Subscription和Processor。
Publisher负责⽣成数据并将数据发送给 Subscription每个Subscriber对应⼀个Subscription。
public interface PublisherT {// Publisher接⼝声明了⼀个⽅法 subscribe()Subscriber可以通过该⽅法向 Publisher发起订阅。public void subscribe(Subscriber? super T s);
}⼀旦Subscriber订阅成功就可以接收来⾃Publisher的事件。
public interface SubscriberT {// Subscriber的第⼀个事件是通过对 onSubscribe()⽅法的调⽤接收的。public void onSubscribe(Subscription s);// 每个数据项都会通过该方法处理public void onNext(T t);// 异常处理public void onError(Throwable t);// 结束public void onComplete();
}
Publisher调⽤ onSubscribe() ⽅法时会将Subscription对象传递给 Subscriber。
通过SubscriptionSubscriber可以管理其订阅情况
public interface Subscription {// Subscriber可以通过调⽤ request()⽅法来请求 Publisher 发送数据可以传⼊⼀个long类型的数值以表明它愿意接受多少数据// 这也是回压能够发挥作⽤的地⽅以避免Publisher 发送多于 Subscriber能够处理的数据量public void request(long n);// 调⽤ cancel()⽅法表明它不再对数据感兴趣并且取消订阅public void cancel();
}
Subscriber 请求数据之后数据就会开始流经响应式流调用onNext方法。
Processor接⼝它是Subscriber和Publisher的组合
public interface ProcessorT, R extends SubscriberT, PublisherR {
}4、Reactor中响应式接口的基本使用
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;public class ReactorDemo {public static void main(String[] args) {// reactor jdk8 stream jdk9 reactive stream// Mono 0-1个元素// Flux 0-N个元素String[] strs { 1, 2, 3 };// 2. 定义订阅者SubscriberInteger subscriber new SubscriberInteger() {private Subscription subscription;Overridepublic void onSubscribe(Subscription subscription) {// 保存订阅关系, 需要用它来给发布者响应this.subscription subscription;// 请求一个数据this.subscription.request(1);}Overridepublic void onNext(Integer item) {// 接受到一个数据, 处理System.out.println(接受到数据: item);try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}// 处理完调用request再请求一个数据this.subscription.request(1);// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了// this.subscription.cancel();}Overridepublic void onError(Throwable throwable) {// 出现了异常(例如处理数据的时候产生了异常)throwable.printStackTrace();// 我们可以告诉发布者, 后面不接受数据了this.subscription.cancel();}Overridepublic void onComplete() {// 全部数据处理完了(发布者关闭了)System.out.println(处理完了!);}};// 这里就是jdk8的streamFlux.fromArray(strs).map(s - Integer.parseInt(s))// 最终操作// 这里就是jdk9的reactive stream.subscribe(subscriber);}
}二、初始Reactor
1、Flux和Mono的基本介绍
Reactor中有两个核心类Mono和Flux。Flux和Mono是Reactor提供的最基础的构建块⽽这两种响应式类型所提供的操作符则是组合使⽤它们以构建数据流动管线的黏合剂。
这两个类实现接口Publisher提供丰富操作符。Flux对象实现发布者返回N个元素Mono实现发布者返回0或者1个元素。
Flux和Mono都是数据流的发布者使用Flux和Mono都可以发出三种数据信号元素值、错误信号、完成信号错误信号和完成信号都代表终止信号终止信号用于告诉订阅者数据流结束了错误信号终止数据流同时把错误信息传递给订阅者。
Flux和Mono共有500多个操作这些操作都可以⼤致归类为创建操作组合操作转换操作逻辑操作。
注意Mono和Flux的很多操作是相同的只不过对应的数据数量不同所以本文更多的操作都是基于Flux的Mono也同理。 2、引入Reactor依赖
需要引入reactor-core核心包和测试包。
dependencygroupIdio.projectreactor/groupIdartifactIdreactor-core/artifactIdversion3.x.x/version
/dependencydependencygroupIdio.projectreactor/groupIdartifactIdreactor-test/artifactIdversion3.x.x/versionscopetest/scope
/dependency
3、响应式类型的创建
Reactor提供了多种创建Flux和Mono的操作。
// 使⽤Flux或Mono上的静态 just()⽅法来创建⼀个响应式类型
Mono.just(1);
FluxString fruitFlux Flux.just(Apple, Orange, Grape, Banana, Strawberry);
// 调用just或其他方法只是声明数据流数据流并没有发出只有进行订阅之后才会触发数据流不订阅什么都不会发生的。
// 添加一个订阅者subscribe的方法参数相当于是一个Consumer
fruitFlux.subscribe(f - System.out.println(Heres some fruit: f)
);// 根据集合创建
String[] fruits new String[] {Apple, Orange, Grape, Banana, Strawberry };
FluxString fruitFlux2 Flux.fromArray(fruits);
ListString list Arrays.asList(fruits);
Flux.fromIterable(list); // 集合StreamString stream list.stream();
Flux.fromStream(stream); // stream流// 根据区间创建1-5
FluxInteger intervalFlux Flux.range(1, 5);
intervalFlux.subscribe(f - System.out.println(data is : f)
);
// 每秒发布⼀个值的Flux通过interval()⽅法创建的Flux会从0开始发布值并且后续的条⽬依次递增。
// 因为interval()⽅法没有指定最⼤值所以它可能会永远运⾏。我们也可以使⽤take()⽅法将结果限制为前5个条⽬。
FluxLong intervalFlux2 Flux.interval(Duration.ofSeconds(1)).take(5);
intervalFlux2.subscribe(f - System.out.println(data2 is : f)
);// 阻塞等待结果
Thread.sleep(100000);4、响应式类型的组合
1使用mergeWith合并响应式流 FluxString characterFlux Flux.just(Garfield, Kojak, Barbossa).delayElements(Duration.ofMillis(500)); // 每500毫秒发布⼀个数据FluxString foodFlux Flux.just(Lasagna, Lollipops, Apples).delaySubscription(Duration.ofMillis(250)) // 订阅后250毫秒后开始发布数据.delayElements(Duration.ofMillis(500)); // 每500毫秒发布⼀个数据// 使⽤mergeWith()⽅法将两个Flux合并合并过后的Flux数据项发布顺序与源Flux的发布时间⼀致
// Garfield Lasagna Kojak Lollipops Barbossa Apples
FluxString mergedFlux characterFlux.mergeWith(foodFlux);mergedFlux.subscribe(System.out::println);// 阻塞等待结果
Thread.sleep(100000);我们发现使用mergeWith合并过的两个FLux并没有严格意义上的先后之分谁产生了数据就接着消费与同一个无异。
2使用zip压缩合并响应式流 FluxString characterFlux Flux.just(Garfield, Kojak, Barbossa);
FluxString foodFlux Flux.just(Lasagna, Lollipops, Apples);
// 当两个Flux对象压缩在⼀起的时候它将会产⽣⼀个新的发布元组的Flux其中每个元组中都包含了来⾃每个源Flux的数据项
// 这个合并后的Flux发出的每个条⽬都是⼀个Tuple2⼀个容纳两个其他对象的容器对象的实例其中包含了来⾃每个源Flux的数据项并保持着它们发布的顺序。
FluxTuple2String, String zippedFlux Flux.zip(characterFlux, foodFlux);zippedFlux.subscribe(t - {System.out.println(t.getT1() | t.getT2());
});
/*** 执行结果* Garfield|Lasagna* Kojak|Lollipops* Barbossa|Apples*/3使用zip压缩合并为自定义对象的响应式流
如果你不想使⽤Tuple2⽽想要使⽤其他类型就可以为zip()⽅法提供⼀个合并函数来⽣成你想要的任何对象合并函数会传⼊这两个数据项。 zip操作的另⼀种形式从每个传⼊Flux中各取⼀个元素然后创建消息对象并产⽣这些消息组成的Flux
FluxString characterFlux Flux.just(Garfield, Kojak, Barbossa);
FluxString foodFlux Flux.just(Lasagna, Lollipops, Apples);// 压缩成自定义对象
FluxString zippedFlux Flux.zip(characterFlux, foodFlux, (c, f) - c eats f);
zippedFlux.subscribe(System.out:: println);/*** 执行结果* Garfield eats Lasagna* Kojak eats Lollipops* Barbossa eats Apples*/4选择第⼀个反应式类型进⾏发布
假设我们有两个Flux对象此时我们不想将它们合并在⼀起⽽是想要创建⼀个新的Flux让这个新的Flux从第⼀个产⽣值的Flux中发布值。first()操作会在两个Flux对象中选择第⼀个发布值的Flux并再次发布它的值。
FluxString slowFlux Flux.just(tortoise, snail, sloth).delaySubscription(Duration.ofMillis(100)); // 延迟100ms
FluxString fastFlux Flux.just(hare, cheetah, squirrel);
// 选择第⼀个反应式类型进⾏发布
FluxString firstFlux Flux.first(slowFlux, fastFlux);
firstFlux.subscribe(System.out::println);
// 阻塞等待结果
Thread.sleep(100000);
/*** 执行结果* hare* cheetah* squirrel*/5、转换和过滤反应式流
在数据流经⼀个流时我们通常需要过滤掉某些值并对其他的值进⾏处理。
1skip操作跳过指定数⽬的消息
skip操作跳过指定数⽬的消息并将剩下的消息继续在结果Flux上进⾏传递
// 跳过3个并创建一个新的Flux
FluxString skipFlux Flux.just(one, two, skip a few, ninety nine, one hundred).skip(3);
skipFlux.subscribe(System.out::println);
/*** 执行结果* ninety nine* one hundred*/2skip()操作的另⼀种形式
在⼀段时间之内跳过所有的第⼀批数据。
// 这是skip()操作的另⼀种形式将会产⽣⼀个新Flux在发布来⾃源Flux的数据项之前等待指定的⼀段时间
FluxString skipFlux Flux.just(one, two, skip a few, ninety nine, one hundred).delayElements(Duration.ofSeconds(1)) // 每1秒一个.skip(Duration.ofSeconds(4)); // 4秒前的都跳过
skipFlux.subscribe(System.out::println);// 阻塞等待结果
Thread.sleep(100000);/*** 执行结果* ninety nine* one hundred*/3take操作只发布第⼀批指定数量的数据项
根据对skip操作的描述来看take可以认为是与skip相反的操作。skip操作会跳过前⾯⼏个数据项⽽take操作只发布第⼀批指定数量的数据项然后将取消订阅。
// take操作只发布传⼊Flux中前⾯指定数⽬的数据项然后将取消订阅
FluxString nationalParkFlux Flux.just(Yellowstone, Yosemite, Grand Canyon,Zion, Grand Teton).take(3);
nationalParkFlux.subscribe(System.out::println);
/*** 执行结果* Yellowstone* Yosemite* Grand Canyon*/4take操作的另一种形式
take()⽅法也有另⼀种替代形式基于间隔时间⽽不是数据项个数在指定的时间过期之前⼀直将消息传递给结果Flux。它将接受并发布与源Flux⼀样多的数据项直到某段时间结束之后Flux将会完成。
// 在订阅之后的前3.5秒发布数据条⽬。
FluxString nationalParkFlux Flux.just(Yellowstone, Yosemite, Grand Canyon,Zion, Grand Teton).delayElements(Duration.ofSeconds(1)).take(Duration.ofMillis(3500));
nationalParkFlux.subscribe(System.out::println);
// 阻塞等待结果
Thread.sleep(100000);
/*** 执行结果* Yellowstone* Yosemite* Grand Canyon*/5filter操作自定义过滤条件
filter操作允许我们根据任何条件进⾏选择性地发布。
FluxString nationalParkFlux Flux.just(Yellowstone, Yosemite, Grand Canyon,Zion, Grand Teton).filter(np - !np.contains( )); // 过滤携带空格的
nationalParkFlux.subscribe(System.out::println);
/*** 执行结果* Yellowstone* Yosemite* Zion*/6distinct操作去重 FluxString animalFlux Flux.just(dog, cat, bird, dog, bird, anteater).distinct();
// 去重
animalFlux.subscribe(System.out::println);
/*** 执行结果* dog* cat* bird* anteater*/7map操作映射新元素
map将元素映射为新的元素并创建一个新的Flux。
// map将元素映射为新的元素并创建一个新的Flux
FluxInteger integerFlux Flux.just(Michael Jordan, Scottie Pippen, Steve Kerr).map(n - {String[] split n.split(\\s);return split.length; // 将String转为Integer});
integerFlux.subscribe(System.out::println);/*** 执行结果* 2* 2* 2*/其中重要的⼀点是在每个数据项被源Flux发布时map操作是同步执⾏的如果你想要异步地转换过程那么你应该考虑使⽤flatMap操作。
8flatMap将流转成新的流
flatMap并不像map操作那样简单地将⼀个对象转换到另⼀个对象⽽是将对象转换为新的Mono或Flux。结果形成的Mono或Flux会扁平化为新的Flux。当与subscribeOn()⽅法结合使⽤时flatMap操作可以释放Reactor反应式的异步能⼒。
// 使⽤flatMap()⽅法和subscribeOn()⽅法
FluxInteger integerFlux Flux.just(Michael, Scottie Pippen, Steve Kerr Ob).flatMap(n - Mono.just(n).map(p - {String[] split p.split(\\s);return split.length; // 将String转为Integer}).subscribeOn(Schedulers.parallel()) // 定义异步);
integerFlux.subscribe(System.out::println);
// 阻塞等待结果
Thread.sleep(100000);9buffer操作现将数据流拆分为小块
buffer操作会产⽣⼀个新的包含列表Flux具备最⼤⻓度限制的列表包含从传⼊的Flux中收集来的数据
// buffer操作会产⽣⼀个新的包含列表Flux具备最⼤⻓度限制的列表包含从传⼊的Flux中收集来的数据
FluxString fruitFlux Flux.just(apple, orange, banana, kiwi, strawberry);
// 创建⼀个新的包含List 集合的Flux其中每个List只有不超过指定数量的元素
FluxListString bufferedFlux fruitFlux.buffer(3); // 数据切分为小块每3个一块
bufferedFlux.subscribe(System.out::println);
/*** 执行结果* [apple, orange, banana]* [kiwi, strawberry]*/
// 可以分片后并行执行
bufferedFlux.flatMap(x -Flux.fromIterable(x).map(y - y.toUpperCase()).subscribeOn(Schedulers.parallel())
).subscribe(l - {System.out.println(Thread.currentThread().getName() 线程执行 l);
});
/*** 执行结果因为并行执行结果可能不一致* parallel-1线程执行APPLE* parallel-1线程执行ORANGE* parallel-1线程执行BANANA* parallel-2线程执行KIWI* parallel-2线程执行STRAWBERRY*/
// 阻塞等待结果
Thread.sleep(100000);使⽤不带参数的buffer()⽅法可以将Flux发布的所有数据项都收集到⼀个List中
FluxListString bufferedFlux fruitFlux.buffer();10collectList操作也可以将所有数据收集到一个List
collectList操作将产⽣⼀个包含传⼊Flux发布的所有消息的Mono。
FluxString fruitFlux Flux.just(apple, orange, banana, kiwi, strawberry);
// 生成一个Mono里面包含一个List
MonoListString fruitListMono fruitFlux.collectList();11collectMap 操作产生⼀个发布Map的Mono
collectMap操作将会产⽣⼀个Mono包含了由传⼊Flux所发出的消息产⽣的Map这个Map的key是从传⼊消息的某些特征衍⽣⽽来的
FluxString animalFlux Flux.just(aardvark, elephant, koala, eagle, kangaroo);
MonoMapCharacter, String animalMapMono animalFlux.collectMap(a - a.charAt(0)); // 将第一个字符作为Map的key
animalMapMono.subscribe(System.out::println);
/*** 执行结果* {aaardvark, eeagle, kkangaroo}*/// 阻塞等待结果
Thread.sleep(100000);key相同的会被覆盖。
6、在反应式类型上执行逻辑操作
1⽤all()⽅法来确保Flux中的所有消息都满⾜某些条件 FluxString animalFlux Flux.just(aardvark, elephant, koala, eagle, kangaroo);
MonoBoolean hasAMono animalFlux.all(a - a.contains(a));都满足条件会返回true否则返回false。
2⽤any()⽅法来确保Flux中⾄少有⼀个消息满⾜某些条件 FluxString animalFlux Flux.just(aardvark, elephant, koala, eagle, kangaroo);
MonoBoolean hasAMono animalFlux.any(a - a.contains(t));至少有一个满足条件就为true都不满足就为false。
7、在反应式类型上使用Subscriber订阅
1使用Subscriber消费消息
FluxString stringFlux Flux.just(Apple, Orange, Grape, Banana, Strawberry);stringFlux.subscribe(new SubscriberString() {// 保存订阅关系, 需要用它来给发布者响应private Subscription subscription;Overridepublic void onSubscribe(Subscription subscription) {System.out.println(订阅者开始订阅);this.subscription subscription;// 请求一个数据this.subscription.request(1);}Overridepublic void onNext(String item) {System.out.println(订阅者开始处理数据 item);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}// 处理完调用request再请求一个数据this.subscription.request(1);// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了// this.subscription.cancel();}Overridepublic void onError(Throwable t) {// 出现了异常(例如处理数据的时候产生了异常)t.printStackTrace();// 我们可以告诉发布者, 后面不接受数据了this.subscription.cancel();}Overridepublic void onComplete() {// 全部数据处理完了(发布者关闭了)System.out.println(订阅者处理完了!);}
});
/*** 执行结果* 订阅者开始订阅* 订阅者开始处理数据Apple* 订阅者开始处理数据Orange* 订阅者开始处理数据Grape* 订阅者开始处理数据Banana* 订阅者开始处理数据Strawberry* 订阅者处理完了!*/// 阻塞
Thread.sleep(10000);2使用Flux的doOnNext处理数据
Flux的doOnNext会添加当Flux发出一个项目时触发的行为(副作用)。
FluxString stringFlux Flux.just(Apple, Orange, Grape, Banana, Strawberry);
stringFlux.doOnNext(t - System.out.println(发布者处理数据 t)).subscribe(t - System.out.println(订阅者处理数据 t));
/*** 执行结果* 发布者处理数据Apple* 订阅者处理数据Apple* 发布者处理数据Orange* 订阅者处理数据Orange* 发布者处理数据Grape* 订阅者处理数据Grape* 发布者处理数据Banana* 订阅者处理数据Banana* 发布者处理数据Strawberry* 订阅者处理数据Strawberry*/// 阻塞
Thread.sleep(10000);但是以下写法是不会触发发布者的doOnNext事件的
FluxString stringFlux Flux.just(Apple, Orange, Grape, Banana, Strawberry);
stringFlux.doOnNext(t - System.out.println(发布者处理数据 t));
stringFlux.subscribe(t - System.out.println(订阅者处理数据 t));只有链式调用才会触发发布者的doOnNext事件。
doOnNext可以写多个顺序执行
FluxString stringFlux Flux.just(Apple, Orange, Grape, Banana, Strawberry);
stringFlux.doOnNext(t - System.out.println(发布者1处理数据 t)).doOnNext(t - System.out.println(发布者2处理数据 t)).subscribe(t - System.out.println(订阅者处理数据 t));
/*** 执行结果* 发布者1处理数据Apple* 发布者2处理数据Apple* 订阅者处理数据Apple* 发布者1处理数据Orange* 发布者2处理数据Orange* 订阅者处理数据Orange* 发布者1处理数据Grape* 发布者2处理数据Grape* 订阅者处理数据Grape* 发布者1处理数据Banana* 发布者2处理数据Banana* 订阅者处理数据Banana* 发布者1处理数据Strawberry* 发布者2处理数据Strawberry* 订阅者处理数据Strawberry*/8、使用then来处理完成数据返回
FluxString just Flux.just(Apple, Orange, Grape, Banana, Strawberry);
// 返回一个Mono 在此Flux完成时完成。这将主动忽略序列只重放完成或错误信号。
just.doOnNext(t - System.out.println(发布者处理数据 t)).then(Mono.defer(() - {return Mono.just(我完成了);})).subscribe(t - System.out.println(订阅者处理数据 t));
/*** 执行结果* 发布者处理数据Apple* 发布者处理数据Orange* 发布者处理数据Grape* 发布者处理数据Banana* 发布者处理数据Strawberry* 订阅者处理数据我完成了*/通常来说发布者发布完之后都需要调用then来处理数据或调用thenEmpty返回一个空的MonoMono.empty()。
写在后面
如果本文对你有帮助请点赞收藏关注一下吧 ~