RxJava
目录
RxJava最核心的东西就是Observables和Subscribers。Observables发出一系列事件,Subscribers处理这些事件。如果一个Observables没有任何的Subscribers,那么这个Observables是不会发出任何事件来。
1.创建Observable
RxJava提供了多种创建Observables的方法。create方法创建一实例需要传入一个OnSubscribe,当Subscriber订阅时进行调用。
创建Subscriber
Subscriber<String> mySubscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
};创建Observable
Observable<String> myObservable=Observable.create(
new Observable.OnSubscribe<String>(){
@Override
public void call(Subscriber<? super String> subscriber) {
System.out.println(subscriber);
//rx.observers.SafeSubscriber@3e4cefce
subscriber.onNext("hello,world!");
}
}
);调用Observable的subscriber方法与Subscrible进行关联
// 通过Observable的subscribe方法完成subscriber对observable的订阅
myObservable.subscribe(mySubscriber);输出
hello,world!Observable的just用来创建只发出一个事件就结束的Observable对象。
Observable的from方法,接受一个集合作为输入,然后每次输出一个元素给Subscriber。
Observable.from(new Integer[]{1,2,3,4,5})
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer/2==0;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer);
}
});2.操作符
操作符就是为了解决对Observable对象的变换问题,操作符用于在Observable和最终的Subscriber之间修改Observable发出的事件。RxJava提供了很多很有用的操作符。
map操作符,就是用来把一个事件转换为另一个事件的。
Observable.just("hello,")
.map(new Func1<String, String>() {
@Override
public String call(String s) {
return s + "Dan";
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});也可以使用map操作符返回一个发出新的数据类型的Observable对象。
flatMap可以返回一个Observable对象。
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
System.out.println("create--->"+subscriber);
//rx.internal.operators.OperatorMap$1@c10d39
subscriber.onNext(10);
}
}).flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(final Integer integer) {
return Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
System.out.println("flatMap--->"+subscriber);
// rx.internal.operators.OperatorMerge$InnerSubscriber@2b3bc58a
subscriber.onNext(integer * integer);
}
});
}
}).subscribe(mySubscriber2);Subscriber<Integer> mySubscriber2 = new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
};3.错误处理
4.调度器
RxJava可以使用Observable的subscribeOn方法指定观察者代码运行的线程。
5.订阅
当调用Observable.subscribe()会返回一个Subscription对象。这个对象代表了观察者和订阅者之间的联系。
//是否解除订阅
subscription.isUnsubscribed()
//解除订阅
subscription.unsubscribe();参考
最后更新于