怎么做学校网站和微信公众号,wordpress空间转移,都匀市城乡建设局网站,网站公司未来计划ppt怎么做文章目录 1. 总览1.1 基本原理1.2 导入包和依赖 2. 操作符2.1 创建操作符2.2 转换操作符2.3 组合操作符2.4 功能操作符 1. 总览
1.1 基本原理 参考文献 构建流#xff1a;每一步操作都会生成一个新的Observable节点(没错#xff0c;包括ObserveOn和SubscribeOn线程变换操作… 文章目录 1. 总览1.1 基本原理1.2 导入包和依赖 2. 操作符2.1 创建操作符2.2 转换操作符2.3 组合操作符2.4 功能操作符 1. 总览
1.1 基本原理 参考文献 构建流每一步操作都会生成一个新的Observable节点(没错包括ObserveOn和SubscribeOn线程变换操作)并将新生成的Observable返回直到最后一步执行subscribe方法。编写Rxjava代码的过程其实就是构建一个一个Observable节点的过程 订阅流从最后一个N5节点的订阅行为开始依次执行前面各个节点真正的订阅方法。在每个节点的订阅方法中都会生成一个新的Observer**这个Observer会包含“下游”的Observer这样当每个节点都执行完订阅(subscribeActual)后也就生成了一串Observer它们通过downstream,upstream引用连接 回调流 当订阅流执行到最后也就是第一个节点N0时用onNext方法两个作用一个是把上个节点返回的数据进行一次map变换另一个就是将map后的结果传递给下游。 小结先从上到下把各个变换的Observable连成链拼装流水线然后在最后subscribe的时候又从下到上通过每个Observable的OnSubscribe从最下的Subscriber对象开始连成链流水线开始工作包装Subscriber直到顶端当顶端的Subscriber对象调用了onNext方法的时候又从上往下调用Subscriber链的onNext(用户一层层拆开包装盒)里面执行了每个操作的变换逻辑。
1.2 导入包和依赖
implementation io.reactivex.rxjava2:rxjava:2.2.21
implementation io.reactivex.rxjava2:rxandroid:2.1.12. 操作符
添加链接描述
2.1 创建操作符 Create private void test1() {//被观察者Observable观察者Observer/消费者consumer通过subsribe订阅Observable.create(new ObservableOnSubscribeObject() {Overridepublic void subscribe(ObservableEmitterObject emitter) throws Exception {emitter.onNext(1);
// emitter.onError(new Throwable(异常模拟));emitter.onComplete();}}).subscribe(new ObserverObject() {Overridepublic void onSubscribe(Disposable d) {System.out.println(subscribe);}Overridepublic void onNext(Object o) {System.out.println(onNext Observer o);}Overridepublic void onError(Throwable e) {System.out.println(erro);}Overridepublic void onComplete() {System.out.println(Complete Observer....);}});}private void test2() {Disposable d Observable.create(new ObservableOnSubscribeObject() {Overridepublic void subscribe(ObservableEmitterObject emitter) throws Exception {emitter.onNext(2);emitter.onError(new Throwable(模拟异常));emitter.onComplete();}}).subscribe(new ConsumerObject() {Overridepublic void accept(Object o) throws Exception {System.out.println(Accept o);}}, new ConsumerThrowable() {Overridepublic void accept(Throwable throwable) throws Exception {System.out.println(Accept throwable);}});}Observer: 适合需要完整事件处理的场景包括处理数据、错误和完成信号。 提供了更灵活的事件处理能力可以根据需求实现对错误和完成事件的响应。 Consumer: 适合简单的场景只需处理每个发出的数据项而不需要关心错误或完成事件。 简化了代码结构特别是在处理简单流时使用起来更为便捷和直观。
其他 just 10个发射源 from 将一个Iterable、一个Future、 或者一个数组内部通过代理的方式转换成一个Observable interval操作符 创建一个按固定时间间隔发射整数序列的Observable这个序列为一个无限递增的整数序列 range操作符 发射一个范围内的有序整数序列并且我们可以指定范围的起始和长度 repeat操作符 重复发射原始Observable的数据序列这个序列或者是无限的或者通过repeat(n)指定重复次数
2.2 转换操作符
map 将源Observable发送的数据转换为一个新的Observable对象 private void test3(){Observable.just(111).map(new FunctionString, Object() {Overridepublic Object apply(String s) throws Exception {return my name is s;}}).subscribe(ob);}//subscribe
//onNext Observer my name is 111
//Complete Observer....flatmap 添加链接描述 将一个发送事件的上游Observable变换为多个发送事件的Observables然后将它们发射的事件合并后放进一个单独的Observable里(但是是无序的) private void test4(){Disposable ob Observable.create(new ObservableOnSubscribeInteger() {Overridepublic void subscribe(ObservableEmitterInteger emitter) throws Exception {emitter.onNext(1);emitter.onNext(2);emitter.onNext(3);}}).flatMap(new FunctionInteger, ObservableSourceString() {Overridepublic ObservableSourceString apply(Integer o) throws Exception {final ListString list new ArrayList();for (int i 0; i 3; i) {list.add(I am value o);}return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);//为了无序 加了延迟}}).subscribe(new ConsumerString() {Overridepublic void accept(String o) throws Exception {System.out.println(o);}});}//出现的 1 2 3会随机出现concatMap concatMap操作符类似于flatMap操作符不同的一点是它按次序连接。
2.3 组合操作符
concat concatArray 合并多个对象按照一定的顺序 merge
2.4 功能操作符
SubscribeOn 改变调用它之前代码的线程只有第一次有效 ObserveOn 改变调用它之后代码的线程 可以多次调用 Observable.create(new ObservableOnSubscribeObject() {Overridepublic void subscribe(ObservableEmitterObject emitter) throws Exception {Log.d(TAG,加了subscribeOn和observeOn Thread.currentThread().getName());emitter.onNext(1111);emitter.onNext(22222);emitter.onComplete();}}).subscribeOn(Schedulers.newThread()) //1 进行创建和发射在子线程.observeOn(AndroidSchedulers.mainThread())// 2 在主线程消费;由于程序是test里面执行所以不是main线程后续改成了main是一样的道理.subscribe(new ObserverObject() {Overridepublic void onSubscribe(Disposable d) {Log.d(TAG,onSubscribe Thread.currentThread().getName());}Overridepublic void onNext(Object o) {Log.d(TAG,onNext Thread.currentThread().getName());}Overridepublic void onError(Throwable e) {Log.d(TAG,onError Thread.currentThread().getName());}Overridepublic void onComplete() {Log.d(TAG,onComplete Thread.currentThread().getName());}});}这一个onSubsribe 一直是在测试线程里 1. **Observable 的创建和订阅**:- 在 subscribe() 方法中你创建了一个 Observer 对象并将其订阅到了 Observable 对象上。2. **onSubscribe 方法执行**:- 当 subscribe() 方法被调用后Observer 对象的 onSubscribe 方法会立即执行。这是因为 onSubscribe 是 Observer 接口的一部分它负责接收 Disposable 对象表示订阅关系而不是响应数据流本身。3. **异步操作执行**:- 然后Observable 中的异步操作开始执行。在你的例子中通过 Observable.create() 创建了一个新的数据流该数据流会在新线程通过 subscribeOn(Schedulers.newThread()) 指定的线程中执行。这意味着 Observable.create() 中的代码块会在新线程中运行而不会阻塞主线程。4. **数据流发射和消费**:- 在新线程中ObservableEmitter 会发射数据项通过 emitter.onNext() 发送数据并在合适的时机调用 onComplete() 或者 onError()表示数据流的结束。5. **observeOn 切换到主线程**:- 通过 observeOn(AndroidSchedulers.mainThread())确保在数据流中的消费者部分即 Observer 的 onNext(), onError(), onComplete() 方法在主线程中执行。这个切换保证了在主线程更新UI或处理数据从而避免了在主线程中执行耗时操作而导致的UI阻塞问题。