【Programming】RxJava リアクティブプログラミング vol.4 / RxJavaの構成~後編~
前回はRxJavaの構成についてまとめました(記事はこちら)。今回も引き続きRxJavaの構成をまとめてみます。
1.4.8 subscribe メソッド/ subscribeWith メソッド
1.4.8.1 subscribeメソッド
- subscribeメソッドとは生産者に対し消費者が購読するメソッド、つまり生産者がデータを通知する消費者を登録する行為
- 生産者が「Cold」の場合、subscribeメソッドを呼ぶと、生産者は処理を開始し通知処理を始める。
- 購読は次の3つの手順で通知処理を行う。
- 購読開始時の初期化処理(onSubscribe)
- データの通知処理(onNext)
- 通知の終了処理(onComplete/onError)
- RxJavaで注意が必要なのは、SubscriberのonSubscribeメソッド内でrequestメソッドを呼び出すと、最初にrequestメソッドを呼び出したタイミングで通知処理を開始する。そのため、onSubscribeメソッド内でrequestメソッドを呼ぶのは全ての処理を呼び出した後で、他に何も処理がない状態で呼びだす必要がある。
- Reactive Streamsの仕様では、subscribeメソッドの引数にSubscriberおよびObserverを渡す場合、subscribeメソッドからの戻り値はないが、RxJavaが用意しているsubscribeメソッドには、、購読を解除するためのDisposableを返すsubscribeメソッドも用意されている。
戻り値の型 | メソッド | 説明 |
---|---|---|
Disposable | subscribe() | Flowable/Observableの処理だけ行いSubscriber/Observerは何もしない |
Disposable | subscribe(Consumer onNext) | データの通知(onNext)を受け取った時の処理のみ引数で定 義してあるように行う |
Disposable | subscribe( Consumer onNext, Consumer onError) | データの通知(onNext)とエラーの通知(onError)を受け取った時の処理のみ引数で定義してあるように行う |
Disposable | subscribe( Consumer onNext, Consumer onError, Action onComplete) | データの通知(onNext)とエラーの通知(onError)と完了の通知(onComplete)を受け取った時の処理のみ引数で定義してあるように行う |
Disposable | subscribe( Consumer onNext, Consumer onError, Action onComplete, Consumer onSubscribe) | データの通知(onNext)とエラーの通知(onError)と完了の通知(onComplete)を受け取った時の処理を引数で定義してあるように行い、さらに購読開始時の処理(onSubscribe)も定義してあるように行う |
※subscribeメソッドでは、デフォルトでリクエストするデータ数にはLong.MAX_VALUEが設定されている。
※エラー時の処理を指定していない場合は、エラーの通知を受けてもスタックトレースを出力するだけでそれ以外は処理しない。
- Disposableを使って購読を解除する例
// 購読を開始する。
Disposable disposable = flowable.subscribe(data -> System.out.println("data=" + data));
// 購読を解除する
disposable.dispose();
1.4.8.2 subscribeWidthメソッド
RxJava2.xではsubscribeメソッドに加え、新たに購読を行うためのメソッドとして、Subscriber/Observerを引数に取り、戻り値も返すsubscribeWithメソッドがある。
引数にSubscriber/Observerを渡すと内部でそのSubscriber/Observerをsubscribeメソッドに渡して実行し、戻り値としてその引数となったSubscriber/Observerを返す。
- subscribeWith メソッドの実装(Flowable の場合)
public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber) {
subscribe(subscriber);
return subscriber;
}
- 引数にResourceSubscriber/ResourceObserverやDisposableSubscriber/DisposableObserverなどのDisposableを実装しているSubscriber/Observerを渡すことで戻り値としてDisposableを受け取ることができる
- RxJava 1.xのように購読後に購読解除が行えるDisposableを取得し、Subscriber/Observerの外部から購読の解除を行うことが可能。
Disposable disposable = flowable.subscribeWith(new ResourceSubscriber(){
// 中略
});
1.4.8.3 observeOn メソッド
- RxJavaでは、データを通知する側と受け取る側とで、処理を別々のスレッドで実行させる場合、observeOnメソッドの引数にSchedulerというスレッド管理を行うオブジェクトを設定することで、データを受け取る側がどのようなスレッド上で処理を行うのかを指定することができるようになる。
- 同じスレッド上でデータを通知する側と処理する側の処理が行われる場合、Flowableがデータを通知した後に、そのデータを受け取ったSubscriberの処理が終わるまで、Flowableは次のデータを通知するのを待つことになる。
1.4.9 CompositeDisposable
CompositeDisposableは複数のDisposableをまとめることで、CompositeDisposableのdisposableのメソッドを呼ぶことで保持している全てのDisposableのdisposeメソッドを呼ぶことができる。
- CompositeDisposableを使って2つの購読を途中で解除するサンプル
public static void main(String[] args) throws Exception {
// Disposableをまとめる
CompositeDisposable compositeDisposable = new CompositeDisposable();
compositeDisposable.add(Flowable.range(1, 3)
.doOnCancel(() -> System.out.println("No.1 canceled"))
.observeOn(Schedulers.computation())
.subscribe(data -> {
Thread.sleep(100L);
System.out.println("No.1: " + data);
}));
compositeDisposable.add(Flowable.range(1, 3)
.doOnCancel(() -> System.out.println("No.2 canceled"))
.observeOn(Schedulers.computation())
.subscribe(data -> {
Thread.sleep(100L);
System.out.println("No.2: " + data);
}));
// しばらく待つ
Thread.sleep(150L);
// まとめて購読の解除を行う
compositeDisposable.
1.4.10 Single/Maybe/Completable
- RxJavaには生産者となるFlowableとObservableに加え、通知を行うクラスとして、Single、Maybe、Completableというクラスが存在する。
クラス | 説明 |
---|---|
Single | データを1件だけ通知するか、もしくはエラーを通知するクラス |
Maybe | データを1件だけ通知するか、1件も通知せず完了を通知するか、もしくはエラーを通知するクラス |
Completable | データを1件も通知せず完了を通知するか、もしくはエラーを通知するクラス |
- これらのクラスはFlowableやObservableのようにデータの通知と完了の通知が分かれているのではなく、データの通知でそのまま完了を意味した
り、データの通知なしで完了の通知のみを行うようになる。 - そのため、それぞれのクラスの通知に対応する消費者が必要になり、通常のSubscriberやObserverが使えず、以下の独自の消費者が用意されている。
- 以下の消費者にはDisposableObserverやResourceObserverに該当する実装クラスが用意されている。
生産者 | 消費者 |
---|---|
Single | SingleObserver |
Maybe | MaybeObserver |
Completable | CompletableObserver |
1.4.10.1 Single
Singleはデータを1件だけ通知するか、もしくはエラーを通知するクラス
データを通知することは処理が完了したことも意味するので、完了の通知は存在しない。
Singleが持つ通知プロトコルにonNextとonCompleteはなく、onSuccessという1件のデータを通知し完了したことを意味する通知プロトコルが用意されている。
データが1件しか通知されないので、データ数をリクエストする必要がない。
- Singleからの通知を受け取るSingleObserverでは、次のメソッドが定義されている。
メソッド | 説明 |
---|---|
onSubscribe(Disposable disposable) | 通知の準備ができたら呼ばれるメソッド。引数に購読の解除を行えるDisposableを受け取る |
onSuccess(T data) | データを受け取り処理を行うメソッド。これ以降のデータの通知はないので、onSuccessメソッドが呼ばれることはSingleの処理が完了したことを意味する |
onError(Throwable error) | 通知処理を行っている間にエラーが発生したら呼ばれるメソッド。 引数に発生したエラーのオブジェクトが渡される |
- Singleのサンプル
public static void main(String[] args) {
// Singleの作成
Single<DayOfWeek> single = Single.create(emitter ->
emitter.onSuccess(LocalDate.now().getDayOfWeek());
});
// 購読する
single.subscribe(new SingleObserver<DayOfWeek>() {
// 購読の準備ができた際の処理を行う
@Override
public void onSubscribe(Disposable disposable) {
// 何もしない
}
// データの通知を受け取った際の処理を行う
@Override
public void onSuccess(DayOfWeek value) {
System.out.println(value);
}
// エラーの通知を受け取った際の処理を行う
@Override
public void onError(Throwable e) {
System.out.println("エラー=" + e);
}
});
}
1.4.10.2 Maybe
Maybeはデータをデータを1件だけ通知するか、1件も通知せず完了を通知するか、もしくはエラーを通知するクラス
Maybeではデータを通知することは処理が完了したことも意味するので、あえて再び完了の通知を行わない。
Maybeが完了の通知を行う場合は、データが1件もなく処理が正常に終了した場合となる。
Maybeの完了の通知(onComplete)は、正常に処理が終了した際に、必ず呼ばれるわけではない
- Maybeからの通知を受け取るMaybeObserverでは、次のメソッドが定義されている。
メソッド | 説明 |
---|---|
onSubscribe(Disposable disposable) | 通知の準備ができたら呼ばれるメソッド。引数に購読の解除を行えるDisposableを受け取る |
onSuccess(T data) | データを受け取り処理を行うメソッド。これ以降のデータの通知はないので、onSuccessメソッドが呼ばれることは、Maybeの処理が完了したことを意味し、onCompleteは呼ばれない |
onComplete | データを通知することなく、Maybeの処理が完了した際に実行されるメソッド |
onError(Throwable error) | 通知処理を行っている間にエラーが発生したら呼ばれるメソッド。 引数に発生したエラーのオブジェクトが渡される |
- Maybeのサンプル
public static void main(String[] args) {
// Maybeの作成
Maybe<DayOfWeek> maybe = Maybe.create(emitter -> {
emitter.onSuccess(LocalDate.now().getDayOfWeek());
});
// 購読する
maybe.subscribe(new MaybeObserver<DayOfWeek>() {
// 購読の準備ができた際の処理を行う
@Override
public void onSubscribe(Disposable disposable) {
// 何もしない
}
// データの通知を受け取った際の処理を行う
@Override
public void onSuccess(DayOfWeek value) {
System.out.println(value);
}
// 完了の通知を受け取った際の処理を行う
@Override
public void onComplete() {
System.out.println("完了");
}
// エラーの通知を受け取った際の処理を行う
@Override
public void onError(Throwable e) {
System.out.println("エラー=" + e);
}
});
1.4.10.3 Completable
Completableはデータを通知することなく完了を通知するか、もしくはエラーを通知するクラス
他の生産者となるクラスと異なり、データを通知することはない
Completableは他の生産者となるクラスと役割が異なり、Completable内で何らかの副作用が発生する処理を行う。
処理が終わった場合に完了の通知を行い、エラーが発生したらエラーの通知を行う。
Completableを使う場合は、その処理の購読を呼び出しているスレッドと異なるスレッド上で行わせないと、RxJavaを使わない場合と同じ処理内容になり、Completableを使う意味がなくなる。
- Completableからの通知を受け取るCompletableObserverでは、次のメソッドが定義されている。
メソッド | 説明 |
---|---|
onSubscribe(Disposable disposable) | 通知の準備ができたら呼ばれるメソッド。引数に購読の解除を行えるDisposableを受け取る |
onComplete | Completableの処理が完了した際に実行されるメソッド |
onError(Throwable error) | 通知処理を行っている間にエラーが発生したら呼ばれるメソッド。引数に発生したエラーのオブジェクトが渡される |
- Completableからの通知を受け取るCompletableObserverでは、次のメソッドが定義されている。のサンプル
public static void main(String[] args) throws Exception {
// Completableの作成
Completable completable = Completable.create(emitter ->
// …略 何らかの処理を行う
// 完了を通知する
emitter.onComplete();
});
completable
// Completableを非同期で行う
.subscribeOn(Schedulers.computation())
// 購読する
.subscribe(new CompletableObserver() { // ❸
// 購読の準備ができた際の処理を行う
@Override
public void onSubscribe(Disposable disposable) {
// 何もしない
}
// 完了の通知を受け取った際の処理を行う
@Override
public void onComplete() {
System.out.println("完了");
}
// エラーの通知を受け取った際の処理を行う
@Override
public void onError(Throwable e) {
System.out.println("エラー=" + e);
}
});
// しばらく待つ
Thread.sleep(100L);
}
1.4.11 RxJava の拡張モジュール
RxJavaは軽量化されているため基本的には必要最低限の機能しか持たない
次のモジュールはRxJavaの拡張として用意されているもの
- RxJavaString:文字列の操作を行うRxJava拡張モジュール。InputStreamやReaderからFlowable/Observableを生成できたりもする
- RxJavaFileUtils:File関連処理を行うRxJava拡張モジュール
- RxJavaMath:数学関連の処理を行うRxJava拡張モジュール
- RxJavaJoins:複数のObservableを使う処理を行うRxJava拡張モジュール
- RxJavaAsyncUtil:非同期処理ユーティリティのRxJava拡張モジュール
Android用としては次のモジュールがある。
- RxAndroid:Androidで使うスレッドを管理するSchedulerを持つRxJava拡張モジュール
- RxLifecycle: AndroidのActivityやFragmentをRxJavaのFlowableやObservableのライフサイクルを同期するためのRxJava拡張モジュール
- RxBinding: Androidの各部品のイベントをRxJavaを連携するRxJava拡張モジュール。
written by tamito0201
プログラミングとのご縁結びならプロマリへ。
オンラインプログラミング学習スクールのプロマリは、プログラミングの初学者の皆様を応援しています。プログラミング講師と一緒に面白いアプリを作りませんか。
The programming school "Promari" will help you learn programming. "Promari" is supporting the first scholars of programming. Let's develop an application with our programming instructor.