Rxjava-quick-guide
RxJava-概要
RxJavaは、ReactiveXのJavaベースの拡張機能です。 Javaでの実装またはReactiveXプロジェクトを提供します。 RxJavaの主な特徴は次のとおりです。
- オブザーバーパターンを拡張します。
- データ/イベントのシーケンスをサポートします。
- シーケンスを宣言的に一緒に構成する演算子を提供します。
- スレッド化、同期、スレッドセーフ、および並行データ構造を内部的に処理します。
ReactiveXとは何ですか?
ReactiveXは、さまざまなプログラミング言語にリアクティブプログラミングの概念を提供することを目的としたプロジェクトです。 リアクティブプログラミングとは、データが表示されたときにプログラムが反応するシナリオを指します。 これはイベントベースのプログラミングコンセプトであり、イベントはレジスタオブザーバに伝播できます。
*Reactive* に従って、Observerパターン、Iteratorパターン、および機能パターンのベストを組み合わせています。
正しく行われたオブザーバーパターン。 ReactiveXは、Observerパターン、Iteratorパターン、および関数型プログラミングの最高のアイデアを組み合わせたものです。
関数型プログラミング
関数型プログラミングは、純粋な関数を使用してソフトウェアを構築することを中心に展開します。 純粋な関数は以前の状態に依存せず、渡された同じパラメーターに対して常に同じ結果を返します。 純粋な関数は、共有オブジェクト、可変データ、およびマルチスレッド環境でよく見られる副作用に関連する問題の回避に役立ちます。
リアクティブプログラミング
リアクティブプログラミングとは、データストリームが非同期的に到着し、到着時に処理されるイベントドリブンプログラミングのことです。
機能的リアクティブプログラミング
RxJavaは両方の概念を一緒に実装し、ストリームのデータは時間とともに変化し、それに応じてコンシューマー関数が反応します。
リアクティブマニフェスト
Reactive Manifestoは、高水準のアプリケーションソフトウェアシステムを示すオンラインドキュメントです。 マニフェストに従って、以下はリアクティブソフトウェアの重要な属性です-
- 応答-常にタイムリーに応答する必要があります。
- メッセージドリブン-疎結合を維持するために、コンポーネント間で非同期メッセージパッシングを使用する必要があります。
- 弾性-高負荷下でも応答性を維持する必要があります。
- 弾力性-コンポーネントに障害が発生した場合でも、応答性を維持する必要があります。
RxJavaの主要コンポーネント
RxJavaには、ObservablesとObserverの2つの主要コンポーネントがあります。
- Observable -ゼロ以上のデータを送信できるStreamに類似したオブジェクトを表し、エラーメッセージを送信でき、そのデータのセットを送信しながら速度を制御でき、有限および無限のデータを送信できます。
- オブザーバー-オブザーバブルのシーケンスのデータをサブスクライブし、オブザーバブルのアイテムごとに反応します。 Observableがデータを送信するたびに、オブザーバーに通知されます。 オブザーバーはデータを1つずつ処理します。
アイテムが存在しないか、前のアイテムのコールバックが返されない場合、オブザーバーに通知されません。
RxJava-環境設定
ローカル環境のセットアップ
RxJavaはJavaのライブラリであるため、最初の要件はJDKをマシンにインストールすることです。
システム要件
JDK | 1.5 or above. |
Memory | No minimum requirement. |
Disk Space | No minimum requirement. |
Operating System | No minimum requirement. |
手順1-マシンでのJavaインストールの検証
まず、コンソールを開き、作業しているオペレーティングシステムに基づいてjavaコマンドを実行します。
OS | Task | Command |
---|---|---|
Windows | Open Command Console | c:\> java -version |
Linux | Open Command Terminal | $ java -version |
Mac | Open Terminal | machine:< joseph$ java -version |
すべてのオペレーティングシステムの出力を確認しましょう-
OS | Output |
---|---|
Windows |
java version "1.8.0_101" Java(TM)SEランタイム環境(ビルド1.8.0_101) |
Linux |
java version "1.8.0_101" Java(TM)SEランタイム環境(ビルド1.8.0_101) |
Mac |
java version "1.8.0_101" Java(TM)SEランタイム環境(ビルド1.8.0_101) |
システムにJavaがインストールされていない場合は、次のリンクからJavaソフトウェア開発キット(SDK)をダウンロードしてください。https://www.oracle.com/technetwork/java/javase/downloads/indexl [https://www .oracle.com]。 このチュートリアルのインストールバージョンとしてJava 1.8.0_101を想定しています。
ステップ2-JAVA環境の設定
*JAVA_HOME* 環境変数を設定して、Javaがマシンにインストールされているベースディレクトリの場所を指すようにします。 例えば。
OS | Output |
---|---|
Windows | Set the environment variable JAVA_HOME to C:\Program Files\Java\jdk1.8.0_101 |
Linux | export JAVA_HOME =/usr/local/java-current |
Mac | export JAVA_HOME =/Library/Java/Home |
Javaコンパイラの場所をシステムパスに追加します。
OS | Output |
---|---|
Windows | Append the string C:\Program Files\Java\jdk1.8.0_101\bin *at the end of the system variable, Path*. |
Linux | export PATH = $PATH:$JAVA_HOME/bin/ |
Mac | not required |
上記の説明に従って、コマンド java -version を使用してJavaのインストールを確認します。
ステップ3-RxJava2アーカイブのダウンロード
RxJava @ MVNRepositoryおよびその依存関係https://mvnrepository.com/artifact/org.reactivestreams/reactive-からRxJava jarファイルの最新バージョンをダウンロードします。 streams [Reactive Streams @ MVNRepository] . このチュートリアルを書いている時点で、rxjava-2.2.4.jar、reactive-streams-1.0.2.jarをダウンロードし、C:\> RxJavaフォルダーにコピーしました。
OS | Archive name |
---|---|
Windows | rxjava-2.2.4.jar, reactive-streams-1.0.2.jar |
Linux | rxjava-2.2.4.jar, reactive-streams-1.0.2.jar |
Mac | rxjava-2.2.4.jar, reactive-streams-1.0.2.jar |
ステップ4-RxJava環境の設定
*RX_JAVA* 環境変数を設定して、RxJava jarがマシンに保存されているベースディレクトリの場所を指すようにします。 RxJavaフォルダーにrxjava-2.2.4.jarとreact-streams-1.0.2.jarを保存したと仮定しましょう。
Sr.No | OS & Description |
---|---|
1 |
Windows 環境変数RX_JAVAをC:\ RxJavaに設定します |
2 |
Linux エクスポートRX_JAVA =/usr/local/RxJava |
3 |
Mac エクスポートRX_JAVA =/Library/RxJava |
ステップ5-CLASSPATH変数の設定
*CLASSPATH* 環境変数を設定して、RxJava jarの場所を指すようにします。
Sr.No | OS & Description |
---|---|
1 |
Windows 環境変数CLASSPATHを%CLASSPATH%;%RX_JAVA%\ rxjava-2.2.4.jar;%RX_JAVA%\ reactive-streams-1.0.2.jar;。;に設定します。 |
2 |
Linux export CLASSPATH = $ CLASSPATH:$ RX_JAVA/rxjava-2.2.4.jar:reactive-streams-1.0.2.jar :. |
3 |
Mac export CLASSPATH = $ CLASSPATH:$ RX_JAVA/rxjava-2.2.4.jar:reactive-streams-1.0.2.jar :. |
ステップ6-RxJavaセットアップのテスト
以下に示すようにクラスTestRx.javaを作成します-
import io.reactivex.Flowable;
public class TestRx {
public static void main(String[] args) {
Flowable.just("Hello World!").subscribe(System.out::println);
}
}
手順7-結果の確認
次のように javac コンパイラを使用してクラスをコンパイルします-
C:\RxJava>javac Tester.java
出力を確認します。
Hello World!
RxJava-Observableの仕組み
*Observables* は、* Observers(サブスクライバー)*がリッスンするデータのソースを表します。 一言で言えば、Observableはアイテムを発行し、Subscriberはこれらのアイテムを消費します。
観測可能
- Observableは、サブスクライバがリスニングを開始するとデータを提供します。
- Observableは任意の数のアイテムを放出できます。
- Observableは、アイテムがなくても完了のシグナルのみを送信できます。
- Observableは正常に終了できます。
- Observableが終了することはありません。 e.g. ボタンは何度でもクリックできます。
- Observableは、いつでもエラーをスローする場合があります。
加入者
- Observableは複数のサブスクライバーを持つことができます。
- Observableがアイテムを発行すると、各サブスクライバーのonNext()メソッドが呼び出されます。
- Observableがアイテムの発行を完了すると、各サブスクライバーのonComplete()メソッドが呼び出されます。
- Observableからエラーが発生した場合、各サブスクライバーのonError()メソッドが呼び出されます。
RxJava-オブザーバブルの作成
以下は、オブザーバブルを作成するための基本クラスです。
- Flowable -0..Nフロー、0またはnアイテムを放出します。 リアクティブストリームとバックプレッシャーをサポートします。
- Observable -0..Nのフローですが、背圧はありません。
- Single -1アイテムまたはエラー。 メソッド呼び出しのリアクティブバージョンとして扱うことができます。
- 完了-アイテムは放出されません。 完了またはエラーのシグナルとして使用されます。 Runnableのリアクティブバージョンとして扱うことができます。
- MayBe -アイテムなし、または1アイテムが放出されました。 Optionalの事後対応バージョンとして扱うことができます。
以下は、Observableクラスでオブザーバブルを作成する便利なメソッドです。
- * just(T item)*-指定された(定数参照)アイテムを通知するObservableを返し、完了します。
- * fromIterable(Iterable source)*-Iterableシーケンスを、シーケンス内のアイテムを放出するObservableSourceに変換します。
- * fromArray(T … items)*-配列内のアイテムを出力するObservableSourceに配列を変換します。
- * fromCallable(Callable supplier)*-Observableを返します。オブザーバーがサブスクライブすると、指定した関数を呼び出し、その関数から返された値を発行します。
- * fromFuture(Future future)*-FutureをObservableSourceに変換します。
- * interval(long initialDelay、long period、TimeUnit unit)*-initialDelayの後に0Lを出力し、その後の各期間の後に数値を増加させるObservableを返します。
RxJava-単一の観測可能
Singleクラスは、単一値の応答を表します。 単一の監視可能オブジェクトは、単一の成功値またはエラーのいずれかのみを出力できます。 onCompleteイベントは発生しません。
クラス宣言
以下は、 io.reactivex.Single <T> クラスの宣言です-
public abstract class Single<T>
extends Object
implements SingleSource<T>
プロトコル
以下は、Single Observableが動作する順次プロトコルです-
onSubscribe (onSuccess | onError)?
単一の例
C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。
ObservableTester.java
import java.util.concurrent.TimeUnit;
import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
//Create the observable
Single<String> testSingle = Single.just("Hello World");
//Create an observer
Disposable disposable = testSingle
.delay(2, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(
new DisposableSingleObserver<String>() {
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onSuccess(String value) {
System.out.println(value);
}
});
Thread.sleep(3000);
//start observing
disposable.dispose();
}
}
結果を確認する
次のように javac コンパイラを使用してクラスをコンパイルします-
C:\RxJava>javac ObservableTester.java
今ObservableTesterを次のように実行します-
C:\RxJava>java ObservableTester
それは次の出力を生成する必要があります-
Hello World
RxJava-MayBe Observable
MayBeクラスは、遅延応答を表します。 MayBe observableは、単一の成功値を出力することも、値を出力しないこともできます。
クラス宣言
以下は、 io.reactivex.Single <T> クラスの宣言です-
public abstract class Maybe<T>
extends Object
implements MaybeSource<T>
プロトコル
以下は、MayBe Observableが動作するシーケンシャルプロトコルです-
onSubscribe (onSuccess | onError | OnComplete)?
MayBeの例
C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。
ObservableTester.java
import java.util.concurrent.TimeUnit;
import io.reactivex.Maybe;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.schedulers.Schedulers;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
//Create an observer
Disposable disposable = Maybe.just("Hello World")
.delay(2, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(new DisposableMaybeObserver<String>() {
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onSuccess(String value) {
System.out.println(value);
}
@Override
public void onComplete() {
System.out.println("Done!");
}
});
Thread.sleep(3000);
//start observing
disposable.dispose();
}
}
結果を確認する
次のように javac コンパイラを使用してクラスをコンパイルします-
C:\RxJava>javac ObservableTester.java
今ObservableTesterを次のように実行します-
C:\RxJava>java ObservableTester
それは次の出力を生成する必要があります-
Hello World
RxJava-完全な観察可能
Completableクラスは、遅延応答を表します。 Completable observableは、正常終了またはエラーを示す場合があります。
クラス宣言
以下は、 io.reactivex.Completable クラスの宣言です-
public abstract class Completable
extends Object
implements CompletableSource
プロトコル
以下はCompletable Observableが動作する順次プロトコルです-
onSubscribe (onError | onComplete)?
完成可能な例
C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。
ObservableTester.java
import java.util.concurrent.TimeUnit;
import io.reactivex.Completable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableCompletableObserver;
import io.reactivex.schedulers.Schedulers;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
//Create an observer
Disposable disposable = Completable.complete()
.delay(2, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(new DisposableCompletableObserver() {
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onStart() {
System.out.println("Started!");
}
@Override
public void onComplete() {
System.out.println("Done!");
}
});
Thread.sleep(3000);
//start observing
disposable.dispose();
}
}
結果を確認する
次のように javac コンパイラを使用してクラスをコンパイルします-
C:\RxJava>javac ObservableTester.java
今ObservableTesterを次のように実行します-
C:\RxJava>java ObservableTester
それは次の出力を生成する必要があります-
Started!
Done!
RxJava-CompositeDisposableの使用
CompositeDisposableクラスは、複数の使い捨てを保持できるコンテナを表し、使い捨てを追加および削除するO(1)の複雑さを提供します。
クラス宣言
以下は、 io.reactivex.disposables.CompositeDisposable クラスの宣言です-
public final class CompositeDisposable
extends Object
implements Disposable, io.reactivex.internal.disposables.DisposableContainer
CompositeDisposableの例
C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。
ObservableTester.java
import io.reactivex.Maybe;
import io.reactivex.Single;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
CompositeDisposable compositeDisposable = new CompositeDisposable();
//Create an Single observer
Disposable disposableSingle = Single.just("Hello World")
.delay(2, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(
new DisposableSingleObserver<String>() {
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onSuccess(String value) {
System.out.println(value);
}
});
//Create an observer
Disposable disposableMayBe = Maybe.just("Hi")
.delay(2, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(new DisposableMaybeObserver<String>() {
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onSuccess(String value) {
System.out.println(value);
}
@Override
public void onComplete() {
System.out.println("Done!");
}
});
Thread.sleep(3000);
compositeDisposable.add(disposableSingle);
compositeDisposable.add(disposableMayBe);
//start observing
compositeDisposable.dispose();
}
}
結果を確認する
次のように javac コンパイラを使用してクラスをコンパイルします-
C:\RxJava>javac ObservableTester.java
今ObservableTesterを次のように実行します-
C:\RxJava>java ObservableTester
それは次の出力を生成する必要があります-
Hello World
Hi
RxJava-オペレーターの作成
以下は、Observableを作成するために使用される演算子です。
Sr.No. | Operator & Description |
---|---|
1 |
Create Observableをゼロから作成し、オブザーバーメソッドがプログラムで呼び出すことを許可します。 |
2 |
Defer オブザーバーがサブスクライブするまでObservableを作成しないでください。 各オブザーバーの新しいオブザーバブルを作成します。 |
3 |
Empty/Never/Throw 動作が制限されたObservableを作成します。 |
4 |
From オブジェクト/データ構造をObservableに変換します。 |
5 |
Interval 指定された時間間隔のギャップで、整数を連続して放出するObservableを作成します。 |
6 |
Just オブジェクトまたはデータ構造をObservableに変換して、同じまたは同じタイプのオブジェクトを発行します。 |
7 |
Range 指定された範囲の順序で整数を出力するObservableを作成します。 |
8 |
Repeat 整数を繰り返し放出するObservableを作成します。 |
9 |
Start 関数の戻り値を発行するObservableを作成します。 |
10 |
Timer 指定された遅延後に単一のアイテムを放出するObservableを作成します。 |
オペレーターの作成例
C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。
ObservableTester.java
import io.reactivex.Observable;
//Using fromArray operator to create an Observable
public class ObservableTester {
public static void main(String[] args) {
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
final StringBuilder result = new StringBuilder();
Observable<String> observable = Observable.fromArray(letters);
observable
.map(String::toUpperCase)
.subscribe( letter -> result.append(letter));
System.out.println(result);
}
}
結果を確認する
次のように javac コンパイラを使用してクラスをコンパイルします-
C:\RxJava>javac ObservableTester.java
今ObservableTesterを次のように実行します-
C:\RxJava>java ObservableTester
それは次の出力を生成する必要があります-
ABCDEFG
RxJava-変換演算子
以下は、Observableから放出されたアイテムを変換するために使用される演算子です。
Sr.No. | Operator & Description |
---|---|
1 |
Buffer Observableからアイテムを定期的にバンドルに収集し、アイテムではなくバンドルを発行します。 |
2 |
FlatMap ネストされたオブザーバブルで使用されます。 アイテムをオブザーバブルに変換します。 次に、アイテムを単一のObservableにフラット化します。 |
3 |
GroupBy Observableをキーごとに編成されたObservableのセットに分割し、アイテムの異なるグループを発行します。 |
4 |
Map 放出される各アイテムに関数を適用して変換します。 |
5 |
Scan 放出された各アイテムに関数を順番に適用し、連続した値を放出します。 |
6 |
Window ObservableからObservableウィンドウにアイテムを定期的に収集し、アイテムではなくウィンドウを放出します。 |
変換演算子の例
C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。
ObservableTester.java
import io.reactivex.Observable;
//Using map operator to transform an Observable
public class ObservableTester {
public static void main(String[] args) {
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
final StringBuilder result = new StringBuilder();
Observable<String> observable = Observable.fromArray(letters);
observable
.map(String::toUpperCase)
.subscribe( letter -> result.append(letter));
System.out.println(result);
}
}
結果を確認する
次のように javac コンパイラを使用してクラスをコンパイルします-
C:\RxJava>javac ObservableTester.java
今ObservableTesterを次のように実行します-
C:\RxJava>java ObservableTester
それは次の出力を生成する必要があります-
ABCDEFG
RxJava-フィルタリング演算子
以下は、Observableから項目を選択的に送信するために使用される演算子です。
Sr.No. | Operator & Description |
---|---|
1 |
Debounce 別のアイテムを発行せずにタイムアウトが発生した場合にのみ、アイテムを発行します。 |
2 |
Distinct 一意のアイテムのみを発行します。 |
3 |
ElementAt Observableによって発行されたnインデックスのアイテムのみを発行します。 |
4 |
Filter 指定された述語関数を渡すアイテムのみを発行します。 |
5 |
First 指定された基準に合格した最初のアイテムまたは最初のアイテムを発行します。 |
6 |
IgnoreElements Observableからアイテムを発行せず、完了をマークします。 |
7 |
Last Observableから最後の要素を発行します。 |
8 |
Sample 指定された時間間隔で最新のアイテムを発行します。 |
9 |
Skip Observableから最初のn個のアイテムをスキップします。 |
10 |
SkipLast Observableの最後のn個のアイテムをスキップします。 |
11 |
Take Observableから最初のn個のアイテムを取得します。 |
12 |
TakeLast Observableから最後のn個のアイテムを取得します。 |
フィルタリング演算子の例
C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。
ObservableTester.java
import io.reactivex.Observable;
//Using take operator to filter an Observable
public class ObservableTester {
public static void main(String[] args) {
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
final StringBuilder result = new StringBuilder();
Observable<String> observable = Observable.fromArray(letters);
observable
.take(2)
.subscribe( letter -> result.append(letter));
System.out.println(result);
}
}
結果を確認する
次のように javac コンパイラを使用してクラスをコンパイルします-
C:\RxJava>javac ObservableTester.java
今ObservableTesterを次のように実行します-
C:\RxJava>java ObservableTester
それは次の出力を生成する必要があります-
ab
RxJava-演算子の組み合わせ
以下は、複数のObservableから単一のObservableを作成するために使用される演算子です。
Sr.No. | Operator & Description |
---|---|
1 |
And/Then/When パターンとプランの仲介を使用してアイテムセットを結合します。 |
2 |
CombineLatest 指定された関数を介して各Observableから放出された最新のアイテムを結合し、結果のアイテムを放出します。 |
3 |
Join 2番目のObservableから放出されたアイテムの時間枠中に放出された場合、2つのObservableから放出されたアイテムを結合します。 |
4 |
Merge Observablesから放出されたアイテムを組み合わせます。 |
5 |
StartWith ソースのObservableからアイテムの発行を開始する前に、指定されたアイテムのシーケンスを発行します |
6 |
Switch Observablesによって放出された最新のアイテムを放出します。 |
7 |
Zip 機能に基づいてObservablesのアイテムを結合し、結果のアイテムを出力します。 |
結合演算子の例
C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。
ObservableTester.java
import io.reactivex.Observable;
//Using combineLatest operator to combine Observables
public class ObservableTester {
public static void main(String[] args) {
Integer[] numbers = { 1, 2, 3, 4, 5, 6};
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
final StringBuilder result = new StringBuilder();
Observable<String> observable1 = Observable.fromArray(letters);
Observable<Integer> observable2 = Observable.fromArray(numbers);
Observable.combineLatest(observable1, observable2, (a,b) -> a + b)
.subscribe( letter -> result.append(letter));
System.out.println(result);
}
}
結果を確認する
次のように javac コンパイラを使用してクラスをコンパイルします-
C:\RxJava>javac ObservableTester.java
今ObservableTesterを次のように実行します-
C:\RxJava>java ObservableTester
それは次の出力を生成する必要があります-
g1g2g3g4g5g6
RxJava-ユーティリティオペレーター
以下は、Observableで役立つことが多い演算子です。
Sr.No. | Operator & Description |
---|---|
1 |
Delay Observableライフサイクルイベントを処理するアクションを登録します。 |
2 |
Materialize/Dematerialize 発行されたアイテムと送信された通知を表します。 |
3 |
ObserveOn 監視するスケジューラを指定します。 |
4 |
Serialize Observableが強制的にシリアル化された呼び出しを行うようにします。 |
5 |
Subscribe Observableからcompleteなどのアイテムと通知の放出を操作します |
6 |
SubscribeOn Observableがサブスクライブされるときに使用されるスケジューラを指定します。 |
7 |
TimeInterval Observableを変換して、放出と放出の間の経過時間の指標を放出します。 |
8 |
Timeout 指定された時間が発生した場合、アイテムを出力しないでエラー通知を発行します。 |
9 |
Timestamp 放出される各アイテムにタイムスタンプを添付します。 |
9 |
Using 使い捨てリソースまたはObservableと同じ寿命を作成します。 |
ユーティリティオペレーターの例
C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。
ObservableTester.java
import io.reactivex.Observable;
//Using subscribe operator to subscribe to an Observable
public class ObservableTester {
public static void main(String[] args) {
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
final StringBuilder result = new StringBuilder();
Observable<String> observable = Observable.fromArray(letters);
observable.subscribe( letter -> result.append(letter));
System.out.println(result);
}
}
結果を確認する
次のように javac コンパイラを使用してクラスをコンパイルします-
C:\RxJava>javac ObservableTester.java
今ObservableTesterを次のように実行します-
C:\RxJava>java ObservableTester
それは次の出力を生成する必要があります-
abcdefg
RxJava-条件演算子
以下は、1つまたは複数のObservableまたは放出されたアイテムを評価する演算子です。
Sr.No. | Operator & Description |
---|---|
1 |
All 指定された基準を満たすために放出されたすべてのアイテムを評価します。 |
2 |
Amb 複数のObservableを指定した場合にのみ、最初のObservableからすべてのアイテムを発行します。 |
3 |
Contains Observableが特定のアイテムを放出するかどうかを確認します。 |
4 |
DefaultIfEmpty Observableが何も発行しない場合、デフォルトのアイテムを発行します。 |
5 |
SequenceEqual 2つのObservableが同じアイテムシーケンスを放出するかどうかをチェックします。 |
6 |
SkipUntil 2番目のObservableがアイテムを放出するまで、最初のObservableによって放出されたアイテムを破棄します。 |
7 |
SkipWhile 特定の条件がfalseになるまで、Observableによって発行されたアイテムを破棄します。 |
8 |
TakeUntil 2番目のObservableがアイテムを発行または終了した後、Observableが発行したアイテムを破棄します。 |
9 |
TakeWhile 指定された条件がfalseになった後、Observableによって発行されたアイテムを破棄します。 |
条件演算子の例
C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。
ObservableTester.java
import io.reactivex.Observable;
//Using defaultIfEmpty operator to operate on an Observable
public class ObservableTester {
public static void main(String[] args) {
final StringBuilder result = new StringBuilder();
Observable.empty()
.defaultIfEmpty("No Data")
.subscribe(s -> result.append(s));
System.out.println(result);
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
final StringBuilder result1 = new StringBuilder();
Observable.fromArray(letters)
.firstElement()
.defaultIfEmpty("No data")
.subscribe(s -> result1.append(s));
System.out.println(result1);
}
}
結果を確認する
次のように javac コンパイラを使用してクラスをコンパイルします-
C:\RxJava>javac ObservableTester.java
今ObservableTesterを次のように実行します-
C:\RxJava>java ObservableTester
それは次の出力を生成する必要があります-
No Data
a
RxJava-数学演算子
以下は、Observableによって放出されるアイテム全体を操作する演算子です。
Sr.No. | Operator & Description |
---|---|
1 |
Average すべてのアイテムの平均を評価し、結果を出力します。 |
2 |
Concat インターリーブせずに複数のObservableからすべてのアイテムを発行します。 |
3 |
Count すべてのアイテムをカウントし、結果を出力します。 |
4 |
Max すべてのアイテムの最大値アイテムを評価し、結果を出力します。 |
5 |
Min すべてのアイテムの最小値アイテムを評価し、結果を出力します。 |
6 |
Reduce 各アイテムに関数を適用し、結果を返します。 |
7 |
Sum すべてのアイテムの合計を評価し、結果を出力します。 |
数学演算子の例
C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。
ObservableTester.java
import io.reactivex.Observable;
//Using concat operator to operate on multiple Observables
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Integer[] numbers = { 1, 2, 3, 4, 5, 6};
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
final StringBuilder result = new StringBuilder();
Observable<String> observable1 = Observable.fromArray(letters);
Observable<Integer> observable2 = Observable.fromArray(numbers);
Observable.concat(observable1, observable2)
.subscribe( letter -> result.append(letter));
System.out.println(result);
}
}
結果を確認する
次のように javac コンパイラを使用してクラスをコンパイルします-
C:\RxJava>javac ObservableTester.java
今ObservableTesterを次のように実行します-
C:\RxJava>java ObservableTester
それは次の出力を生成する必要があります-
abcdefg123456
RxJava-接続可能なオペレーター
以下は、サブスクリプションをより正確に制御できるオペレーターです。
Sr.No. | Operator & Description |
---|---|
1 |
Connect 接続可能なObservableに、サブスクライバーにアイテムを発行するように指示します。 |
2 |
Publish Observableを接続可能なObservableに変換します。 |
3 |
RefCount 接続可能なObservableを通常のObservableに変換します。 |
4 |
Replay Observableがアイテムの発行を開始し、サブスクライバーが後でサブスクライブした後でも、各サブスクライバーが同じ順序で発行されたアイテムを見るようにします。 |
接続可能なオペレーターの例
C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。
ObservableTester.java
import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;
//Using connect operator on a ConnectableObservable
public class ObservableTester {
public static void main(String[] args) {
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
final StringBuilder result = new StringBuilder();
ConnectableObservable<String> connectable = Observable.fromArray(letters).publish();
connectable.subscribe(letter -> result.append(letter));
System.out.println(result.length());
connectable.connect();
System.out.println(result.length());
System.out.println(result);
}
}
結果を確認する
次のように javac コンパイラを使用してクラスをコンパイルします-
C:\RxJava>javac ObservableTester.java
今ObservableTesterを次のように実行します-
C:\RxJava>java ObservableTester
それは次の出力を生成する必要があります-
0
7
abcdefg
RxJava-サブジェクト
Reactiveに従って、サブジェクトはオブザーバとしてだけでなく、オブザーバとしても機能できます。
'_サブジェクトは、オブザーバーとオブザーバブルの両方として機能するReactiveXの一部の実装で利用可能な一種のブリッジまたはプロキシです。 オブザーバーであるため、1つ以上のObservableにサブスクライブでき、Observableであるため、監視するアイテムを再送信することで通過でき、新しいアイテムを送信することもできます。_
主題の4つのタイプがあります-
Sr.No. | Subject & Description |
---|---|
1 |
Publish Subject サブスクリプションの時間後に放出されるアイテムのみを放出します。 |
2 |
Replay Subject Observableをサブスクライブした時期に関係なく、ソースObservableによって発行されたすべてのアイテムを発行します。 |
3 |
Behavior Subject サブスクリプション時に、最新のアイテムを発行し、ソースObservableによって発行されたアイテムを引き続き発行します。 |
4 |
Async Subject 放出が完了すると、ソースObservableによって放出された最後のアイテムを放出します。 |
RxJava-PublishSubject
PublishSubjectは、現在サブスクライブしているオブザーバーにアイテムを発行し、現在または後のオブザーバーに端末イベントを発行します。
クラス宣言
以下は、 io.reactivex.subjects.PublishSubject <T> クラスの宣言です-
public final class PublishSubject<T>
extends Subject<T>
PublishSubjectの例
C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。
ObservableTester.java
import io.reactivex.subjects.PublishSubject;
public class ObservableTester {
public static void main(String[] args) {
final StringBuilder result1 = new StringBuilder();
final StringBuilder result2 = new StringBuilder();
PublishSubject<String> subject = PublishSubject.create();
subject.subscribe(value -> result1.append(value) );
subject.onNext("a");
subject.onNext("b");
subject.onNext("c");
subject.subscribe(value -> result2.append(value));
subject.onNext("d");
subject.onComplete();
//Output will be abcd
System.out.println(result1);
//Output will be d only
//as subscribed after c item emitted.
System.out.println(result2);
}
}
結果を確認する
次のように javac コンパイラを使用してクラスをコンパイルします-
C:\RxJava>javac ObservableTester.java
今ObservableTesterを次のように実行します-
C:\RxJava>java ObservableTester
それは次の出力を生成する必要があります-
abcd
d
RxJava-BehaviorSubject
BehaviorSubjectは、観察した最新のアイテムを発行し、その後、サブスクライブされた各オブザーバーにその後のすべての観察されたアイテムを発行します。
クラス宣言
以下は、 io.reactivex.subjects.BehaviorSubject <T> クラスの宣言です-
public final class BehaviorSubject<T>
extends Subject<T>
BehaviorSubjectの例
C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。
ObservableTester.java
import io.reactivex.subjects.BehaviorSubject;
public class ObservableTester {
public static void main(String[] args) {
final StringBuilder result1 = new StringBuilder();
final StringBuilder result2 = new StringBuilder();
BehaviorSubject<String> subject = BehaviorSubject.create();
subject.subscribe(value -> result1.append(value) );
subject.onNext("a");
subject.onNext("b");
subject.onNext("c");
subject.subscribe(value -> result2.append(value));
subject.onNext("d");
subject.onComplete();
//Output will be abcd
System.out.println(result1);
//Output will be cd being BehaviorSubject
//(c is last item emitted before subscribe)
System.out.println(result2);
}
}
結果を確認する
次のように javac コンパイラを使用してクラスをコンパイルします-
C:\RxJava>javac ObservableTester.java
今ObservableTesterを次のように実行します-
C:\RxJava>java ObservableTester
それは次の出力を生成する必要があります-
abcd
cd
RxJava-ReplaySubject
ReplaySubjectは、イベントおよびアイテムを現在および後のオブザーバーにリプレイします。
クラス宣言
以下は、 io.reactivex.subjects.ReplaySubject <T> クラスの宣言です-
public final class ReplaySubject<T>
extends Subject<T>
ReplaySubjectの例
C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。
ObservableTester.java
import io.reactivex.subjects.ReplaySubject;
public class ObservableTester {
public static void main(String[] args) {
final StringBuilder result1 = new StringBuilder();
final StringBuilder result2 = new StringBuilder();
ReplaySubject<String> subject = ReplaySubject.create();
subject.subscribe(value -> result1.append(value) );
subject.onNext("a");
subject.onNext("b");
subject.onNext("c");
subject.subscribe(value -> result2.append(value));
subject.onNext("d");
subject.onComplete();
//Output will be abcd
System.out.println(result1);
//Output will be abcd being ReplaySubject
//as ReplaySubject emits all the items
System.out.println(result2);
}
}
結果を確認する
次のように javac コンパイラを使用してクラスをコンパイルします-
C:\RxJava>javac ObservableTester.java
今ObservableTesterを次のように実行します-
C:\RxJava>java ObservableTester
それは次の出力を生成する必要があります-
abcd
abcd
RxJava-AsyncSubject
AsyncSubjectは、完了イベントまたは受信したエラーが続く最後の値のみをObserversに送信します。
クラス宣言
以下は、 io.reactivex.subjects.AsyncSubject <T> クラスの宣言です-
public final class AsyncSubject<T>
extends Subject<T>
AsyncSubjectの例
C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。
ObservableTester.java
import io.reactivex.subjects. AsyncSubject;
public class ObservableTester {
public static void main(String[] args) {
final StringBuilder result1 = new StringBuilder();
final StringBuilder result2 = new StringBuilder();
AsyncSubject<String> subject = AsyncSubject.create();
subject.subscribe(value -> result1.append(value) );
subject.onNext("a");
subject.onNext("b");
subject.onNext("c");
subject.subscribe(value -> result2.append(value));
subject.onNext("d");
subject.onComplete();
//Output will be d being the last item emitted
System.out.println(result1);
//Output will be d being the last item emitted
System.out.println(result2);
}
}
結果を確認する
次のように javac コンパイラを使用してクラスをコンパイルします-
C:\RxJava>javac ObservableTester.java
今ObservableTesterを次のように実行します-
C:\RxJava>java ObservableTester
それは次の出力を生成する必要があります-
d
d
RxJava-スケジューラー
スケジューラーは、マルチスレッド環境で使用され、Observableオペレーターと連携します。
Reactiveに従って、Schedulerは、オペレーターのチェーンが異なるスレッドにどのように適用されるかをスケジュールするために使用されます。
デフォルトでは、Observableとそれに適用する一連の演算子は、Subscribeメソッドが呼び出された同じスレッドで作業を行い、オブザーバーに通知します。 SubscribeOnオペレーターは、Observableが動作する別のスケジューラーを指定することにより、この動作を変更します。 ObserveOnオペレーターは、Observableがオブザーバーに通知を送信するために使用する別のスケジューラーを指定します。
RxJavaで利用可能なスケジューラには次のタイプがあります-
Sr.No. | Scheduler & Description |
---|---|
1 |
Schedulers.computation() 計算作業用のスケジューラを作成して返します。 スケジュールされるスレッドの数は、システムに存在するCPUに依存します。 CPUごとに1つのスレッドが許可されます。 イベントループまたはコールバック操作に最適です。 |
2 |
Schedulers.io() IOにバインドされた作業用のスケジューラを作成して返します。 スレッドプールは必要に応じて拡張できます。 |
3 |
Schedulers.newThread() 作業単位ごとに新しいスレッドを作成するスケジューラを作成して返します。 |
4 |
Schedulers.trampoline() 現在の作業が完了した後に実行される現在のスレッドの作業をキューに入れるスケジューラを作成して返します。 |
4 |
Schedulers.from(java.util.concurrent.Executor executor) エグゼキューターを新しいスケジューラーインスタンスに変換します。 |
RxJava-トランポリンスケジューラ
Schedulers.trampoline()メソッドは、現在の作業の完了後に実行される現在のスレッドの作業をキューに入れるスケジューラを作成して返します。
Schedulers.trampoline()の例
C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。
ObservableTester.java
import java.util.Random;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Observable.just("A", "AB", "ABC")
.flatMap(v -> getLengthWithDelay(v)
.doOnNext(s -> System.out.println("Processing Thread "
+ Thread.currentThread().getName()))
.subscribeOn(Schedulers.trampoline()))
.subscribe(length -> System.out.println("Receiver Thread "
+ Thread.currentThread().getName()
+ ", Item length " + length));
Thread.sleep(10000);
}
protected static Observable<Integer> getLengthWithDelay(String v) {
Random random = new Random();
try {
Thread.sleep(random.nextInt(3) * 1000);
return Observable.just(v.length());
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
結果を確認する
次のように javac コンパイラを使用してクラスをコンパイルします-
C:\RxJava>javac ObservableTester.java
今ObservableTesterを次のように実行します-
C:\RxJava>java ObservableTester
それは次の出力を生成する必要があります-
Processing Thread main
Receiver Thread main, Item length 1
Processing Thread main
Receiver Thread main, Item length 2
Processing Thread main
Receiver Thread main, Item length 3
RxJava-NewThreadスケジューラー
Schedulers.newThread()メソッドは、作業単位ごとに新しいスレッドを作成するスケジューラを作成して返します。
Schedulers.newThread()の例
C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。
ObservableTester.java
import java.util.Random;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Observable.just("A", "AB", "ABC")
.flatMap(v -> getLengthWithDelay(v)
.doOnNext(s -> System.out.println("Processing Thread "
+ Thread.currentThread().getName()))
.subscribeOn(Schedulers.newThread()))
.subscribe(length -> System.out.println("Receiver Thread "
+ Thread.currentThread().getName()
+ ", Item length " + length));
Thread.sleep(10000);
}
protected static Observable<Integer> getLengthWithDelay(String v) {
Random random = new Random();
try {
Thread.sleep(random.nextInt(3) * 1000);
return Observable.just(v.length());
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
結果を確認する
次のように javac コンパイラを使用してクラスをコンパイルします-
C:\RxJava>javac ObservableTester.java
今ObservableTesterを次のように実行します-
C:\RxJava>java ObservableTester
それは次の出力を生成する必要があります-
Processing Thread RxNewThreadScheduler-1
Receiver Thread RxNewThreadScheduler-1, Item length 1
Processing Thread RxNewThreadScheduler-2
Receiver Thread RxNewThreadScheduler-2, Item length 2
Processing Thread RxNewThreadScheduler-3
Receiver Thread RxNewThreadScheduler-3, Item length 3
RxJava-計算スケジューラ
Schedulers.computation()メソッドは、計算作業用のスケジューラを作成して返します。 スケジュールされるスレッドの数は、システムに存在するCPUに依存します。 CPUごとに1つのスレッドが許可されます。 イベントループまたはコールバック操作に最適です。
Schedulers.computation()の例
C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。
ObservableTester.java
import java.util.Random;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Observable.just("A", "AB", "ABC")
.flatMap(v -> getLengthWithDelay(v)
.doOnNext(s -> System.out.println("Processing Thread "
+ Thread.currentThread().getName()))
.subscribeOn(Schedulers.computation()))
.subscribe(length -> System.out.println("Receiver Thread "
+ Thread.currentThread().getName()
+ ", Item length " + length));
Thread.sleep(10000);
}
protected static Observable<Integer> getLengthWithDelay(String v) {
Random random = new Random();
try {
Thread.sleep(random.nextInt(3) * 1000);
return Observable.just(v.length());
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
結果を確認する
次のように javac コンパイラを使用してクラスをコンパイルします-
C:\RxJava>javac ObservableTester.java
今ObservableTesterを次のように実行します-
C:\RxJava>java ObservableTester
それは次の出力を生成する必要があります-
Processing Thread RxComputationThreadPool-1
Receiver Thread RxComputationThreadPool-1, Item length 1
Processing Thread RxComputationThreadPool-2
Receiver Thread RxComputationThreadPool-2, Item length 2
Processing Thread RxComputationThreadPool-3
Receiver Thread RxComputationThreadPool-3, Item length 3
RxJava-IOスケジューラー
Schedulers.io()メソッドは、IOバウンド作業用のスケジューラを作成して返します。 スレッドプールは必要に応じて拡張できます。 I/O集中型の操作に最適です。
Schedulers.io()の例
C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。
ObservableTester.java
import java.util.Random;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Observable.just("A", "AB", "ABC")
.flatMap(v -> getLengthWithDelay(v)
.doOnNext(s -> System.out.println("Processing Thread "
+ Thread.currentThread().getName()))
.subscribeOn(Schedulers.io()))
.subscribe(length -> System.out.println("Receiver Thread "
+ Thread.currentThread().getName()
+ ", Item length " + length));
Thread.sleep(10000);
}
protected static Observable<Integer> getLengthWithDelay(String v) {
Random random = new Random();
try {
Thread.sleep(random.nextInt(3) * 1000);
return Observable.just(v.length());
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
結果を確認する
次のように javac コンパイラを使用してクラスをコンパイルします-
C:\RxJava>javac ObservableTester.java
今ObservableTesterを次のように実行します-
C:\RxJava>java ObservableTester
それは次の出力を生成する必要があります-
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 1
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 2
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 3
RxJava-スケジューラーから
Schedulers.from(Executor)メソッドは、Executorを新しいSchedulerインスタンスに変換します。
Schedulers.from(Executor)の例
C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。
ObservableTester.java
import java.util.Random;
import java.util.concurrent.Executors;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Observable.just("A", "AB", "ABC")
.flatMap(v -> getLengthWithDelay(v)
.doOnNext(s -> System.out.println("Processing Thread "
+ Thread.currentThread().getName()))
.subscribeOn(Schedulers.from(Executors.newFixedThreadPool(3))))
.subscribe(length -> System.out.println("Receiver Thread "
+ Thread.currentThread().getName()
+ ", Item length " + length));
Thread.sleep(10000);
}
protected static Observable<Integer> getLengthWithDelay(String v) {
Random random = new Random();
try {
Thread.sleep(random.nextInt(3) * 1000);
return Observable.just(v.length());
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
結果を確認する
次のように javac コンパイラを使用してクラスをコンパイルします-
C:\RxJava>javac ObservableTester.java
今ObservableTesterを次のように実行します-
C:\RxJava>java ObservableTester
それは次の出力を生成する必要があります-
Processing Thread pool-1-thread-1
Processing Thread pool-3-thread-1
Receiver Thread pool-1-thread-1, Item length 1
Processing Thread pool-4-thread-1
Receiver Thread pool-4-thread-1, Item length 3
Receiver Thread pool-3-thread-1, Item length 2
RxJava-バッファリング
バッファリング演算子を使用すると、Observableによって発行されたアイテムをリストまたはバンドルに収集し、アイテムの代わりにそれらのバンドルを発行できます。 以下の例では、9つのアイテムを発行するObservableを作成し、バッファリングを使用して、3つのアイテムが一緒に発行されます。
バッファリングの例
C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。
ObservableTester.java
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Observable<Integer> observable = Observable.just(1, 2, 3, 4,
5, 6, 7, 8, 9);
observable.subscribeOn(Schedulers.io())
.delay(2, TimeUnit.SECONDS, Schedulers.io())
.buffer(3)
.subscribe(new Observer<List<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("Subscribed");
}
@Override
public void onNext(List<Integer> integers) {
System.out.println("onNext: ");
for (Integer value : integers) {
System.out.println(value);
}
}
@Override
public void onError(Throwable e) {
System.out.println("Error");
}
@Override
public void onComplete() {
System.out.println("Done! ");
}
});
Thread.sleep(3000);
}
}
結果を確認する
次のように javac コンパイラを使用してクラスをコンパイルします-
C:\RxJava>javac ObservableTester.java
今ObservableTesterを次のように実行します-
C:\RxJava>java ObservableTester
それは次の出力を生成する必要があります-
Subscribed
onNext:
1
2
3
onNext:
4
5
6
onNext:
7
8
9
Done!
RxJava-ウィンドウ
ウィンドウ演算子は、バッファー演算子と同様に機能しますが、Observableによって発行されたアイテムをコレクションの代わりに別のObservableに収集し、コレクションの代わりにそれらのObservableを発行できます。 以下の例では、9つのアイテムを放出するObservableを作成し、window演算子を使用して、3つのObservableを一緒に放出します。
ウィンドウ処理の例
C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。
ObservableTester.java
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Observable<Integer> observable = Observable.just(1, 2, 3, 4,
5, 6, 7, 8, 9);
observable.subscribeOn(Schedulers.io())
.delay(2, TimeUnit.SECONDS, Schedulers.io())
.window(3)
.subscribe(new Observer<Observable<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("Subscribed");
}
@Override
public void onNext(Observable<Integer> integers) {
System.out.println("onNext: ");
integers.subscribe(value -> System.out.println(value));
}
@Override
public void onError(Throwable e) {
System.out.println("Error");
}
@Override
public void onComplete() {
System.out.println("Done! ");
}
});
Thread.sleep(3000);
}
}
結果を確認する
次のように javac コンパイラを使用してクラスをコンパイルします-
C:\RxJava>javac ObservableTester.java
今ObservableTesterを次のように実行します-
C:\RxJava>java ObservableTester
それは次の出力を生成する必要があります-
Subscribed
onNext:
1
2
3
onNext:
4
5
6
onNext:
7
8
9
Done!