ityWeathData方法返回出来一个泛型为WeathData的Observable:
/**
* 获取一个城市的天气数据
* @param city
* @return
*/
private Observable<WeathData> getCityWeathData(final String city){
return Observable.just(city)
.map(new Func1<String, WeathData>() {
@Override
public WeathData call(String s) {
//通过网络请求获取城市的天气数据
WeathData weathData = new WeathData();
weathData.city = city ;
weathData.state = "晴天" ;
return weathData ;
}
});
}
6、zip
//zip 将两个Observable按照规则严格的合成一个Observable
Observable<Integer> observable1 = Observable.just(10, 20, 30,40);
Observable<Integer> observable2 = Observable.just(1, 2, 3,4);
Observable.zip(observable1, observable2, new Func2<Integer, Integer, String>() {
@Override
public String call(Integer integer, Integer integer2) {
//定义合并规则
return integer + integer2 + "abc";
}
}).subscribe(new Action1<String>() {
@Override
public void call(String string) {
Log.i(TAG,"call:"+string) ;
}
});
7、zipWith
//zipwith 将本身与其他的Observable按照规则严格的合并成一个Observable
Observable.just(10,20,30,40)
.zipWith(Observable.just("a", "b", "c"), new Func2<Integer, String, String>() {
@Override
public String call(Integer integer, String s) {
//合并规则
return integer + s ;
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG,"call:"+s) ;
}
});
8、retry
//retry 在出错的时候重试(异常的时候重新执行)
//用处:网络连接异常的时候
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
try {
for (int i = 0; i < 5; i++) {
if (i == 3) {
throw new Exception("出错了");
}
subscriber.onNext(i);
}
subscriber.onCompleted();
}catch (Exception e){
subscriber.onError(e);
}
}
}).retry(2).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.i(TAG,"onCompleted");
}
@Override
public void onError(Throwable e) {
Log.i(TAG,"onError:"+e.getMessage()) ;
}
@Override
public void onNext(Integer integer) {
Log.i(TAG,"onNext:"+integer) ;
}
});
9、retryWhen
//retrywhen 异常的时候执行
//网络请求框架中,一般使用retryWhen 要执行操作是连接网络,连接出异常的时候,
// 1、我们可以直接重复执行连接网络,retry
// 2、同时我们也可以判断连接异常的类型,再做决定是否重连 retyrWhen
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
Log.i(TAG,"总出错");
subscriber.onError(new Throwable("出错了"));
}
}).retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Integer>() {
@Override
public Integer call(Throwable throwable, Integer integer) {
return integer;
}
}).flatMap(new Func1<Integer, Observable<?>>() {
@Override
public Observable<?> call(Integer integer) {
//timer 延迟执行的操作符
Log.i(TAG,"延迟"+integer+"秒");
return Observable.timer(integer, TimeUnit.SECONDS);
}
});
}
}).subs