Rxjava-quick-guide

提供:Dev Guides
移動先:案内検索

RxJava-概要

RxJavaは、ReactiveXのJavaベースの拡張機能です。 Javaでの実装またはReactiveXプロジェクトを提供します。 RxJavaの主な特徴は次のとおりです。

  • オブザーバーパターンを拡張します。
  • データ/イベントのシーケンスをサポートします。
  • シーケンスを宣言的に一緒に構成する演算子を提供します。
  • スレッド化、同期、スレッドセーフ、および並行データ構造を内部的に処理します。

ReactiveXとは何ですか?

ReactiveXは、さまざまなプログラミング言語にリアクティブプログラミングの概念を提供することを目的としたプロジェクトです。 リアクティブプログラミングとは、データが表示されたときにプログラムが反応するシナリオを指します。 これはイベントベースのプログラミングコンセプトであり、イベントはレジスタオブザーバに伝播できます。

*Reactive* に従って、Observerパターン、Iteratorパターン、および機能パターンのベストを組み合わせています。

正しく行われたオブザーバーパターン。 ReactiveXは、Ob​​serverパターン、Iteratorパターン、および関数型プログラミングの最高のアイデアを組み合わせたものです。

関数型プログラミング

関数型プログラミングは、純粋な関数を使用してソフトウェアを構築することを中心に展開します。 純粋な関数は以前の状態に依存せず、渡された同じパラメーターに対して常に同じ結果を返します。 純粋な関数は、共有オブジェクト、可変データ、およびマルチスレッド環境でよく見られる副作用に関連する問題の回避に役立ちます。

リアクティブプログラミング

リアクティブプログラミングとは、データストリームが非同期的に到着し、到着時に処理されるイベントドリブンプログラミングのことです。

機能的リアクティブプログラミング

RxJavaは両方の概念を一緒に実装し、ストリームのデータは時間とともに変化し、それに応じてコンシューマー関数が反応します。

リアクティブマニフェスト

Reactive Manifestoは、高水準のアプリケーションソフトウェアシステムを示すオンラインドキュメントです。 マニフェストに従って、以下はリアクティブソフトウェアの重要な属性です-

  • 応答-常にタイムリーに応答する必要があります。
  • メッセージドリブン-疎結合を維持するために、コンポーネント間で非同期メッセージパッシングを使用する必要があります。
  • 弾性-高負荷下でも応答性を維持する必要があります。
  • 弾力性-コンポーネントに障害が発生した場合でも、応答性を維持する必要があります。

RxJavaの主要コンポーネント

RxJavaには、ObservablesとObserverの2つの主要コンポーネントがあります。

  • Observable -ゼロ以上のデータを送信できるStreamに類似したオブジェクトを表し、エラーメッセージを送信でき、そのデータのセットを送信しながら速度を制御でき、有限および無限のデータを送信できます。
  • オブザーバー-オブザーバブルのシーケンスのデータをサブスクライブし、オブザーバブルのアイテムごとに反応します。 Observableがデータを送信するたびに、オブザーバーに通知されます。 オブザーバーはデータを1つずつ処理します。

アイテムが存在しないか、前のアイテムのコールバックが返されない場合、オブザーバーに通知されません。

RxJava-環境設定

ローカル環境のセットアップ

RxJavaはJavaのライブラリであるため、最初の要件はJDKをマシンにインストールすることです。

システム要件

JDK 1.5 or above.
Memory No minimum requirement.
Disk Space No minimum requirement.
Operating System No minimum requirement.

手順1-マシンでのJavaインストールの検証

まず、コンソールを開き、作業しているオペレーティングシステムに基づいてjavaコマンドを実行します。

OS Task Command
Windows Open Command Console c:\> java -version
Linux Open Command Terminal $ java -version
Mac Open Terminal machine:< joseph$ java -version

すべてのオペレーティングシステムの出力を確認しましょう-

OS Output
Windows

java version "1.8.0_101"

Java(TM)SEランタイム環境(ビルド1.8.0_101)

Linux

java version "1.8.0_101"

Java(TM)SEランタイム環境(ビルド1.8.0_101)

Mac

java version "1.8.0_101"

Java(TM)SEランタイム環境(ビルド1.8.0_101)

システムにJavaがインストールされていない場合は、次のリンクからJavaソフトウェア開発キット(SDK)をダウンロードしてください。https://www.oracle.com/technetwork/java/javase/downloads/indexl [https://www .oracle.com]。 このチュートリアルのインストールバージョンとしてJava 1.8.0_101を想定しています。

ステップ2-JAVA環境の設定

*JAVA_HOME* 環境変数を設定して、Javaがマシンにインストールされているベースディレクトリの場所を指すようにします。 例えば。
OS Output
Windows Set the environment variable JAVA_HOME to C:\Program Files\Java\jdk1.8.0_101
Linux export JAVA_HOME =/usr/local/java-current
Mac export JAVA_HOME =/Library/Java/Home

Javaコンパイラの場所をシステムパスに追加します。

OS Output
Windows Append the string C:\Program Files\Java\jdk1.8.0_101\bin *at the end of the system variable, Path*.
Linux export PATH = $PATH:$JAVA_HOME/bin/
Mac not required

上記の説明に従って、コマンド java -version を使用してJavaのインストールを確認します。

ステップ3-RxJava2アーカイブのダウンロード

RxJava @ MVNRepositoryおよびその依存関係https://mvnrepository.com/artifact/org.reactivestreams/reactive-からRxJava jarファイルの最新バージョンをダウンロードします。 streams [Reactive Streams @ MVNRepository] . このチュートリアルを書いている時点で、rxjava-2.2.4.jar、reactive-streams-1.0.2.jarをダウンロードし、C:\> RxJavaフォルダーにコピーしました。

OS Archive name
Windows rxjava-2.2.4.jar, reactive-streams-1.0.2.jar
Linux rxjava-2.2.4.jar, reactive-streams-1.0.2.jar
Mac rxjava-2.2.4.jar, reactive-streams-1.0.2.jar

ステップ4-RxJava環境の設定

*RX_JAVA* 環境変数を設定して、RxJava jarがマシンに保存されているベースディレクトリの場所を指すようにします。 RxJavaフォルダーにrxjava-2.2.4.jarとreact-streams-1.0.2.jarを保存したと仮定しましょう。
Sr.No OS & Description
1

Windows

環境変数RX_JAVAをC:\ RxJavaに設定します

2

Linux

エクスポートRX_JAVA =/usr/local/RxJava

3

Mac

エクスポートRX_JAVA =/Library/RxJava

ステップ5-CLASSPATH変数の設定

*CLASSPATH* 環境変数を設定して、RxJava jarの場所を指すようにします。
Sr.No OS & Description
1

Windows

環境変数CLASSPATHを%CLASSPATH%;%RX_JAVA%\ rxjava-2.2.4.jar;%RX_JAVA%\ reactive-streams-1.0.2.jar;。;に設定します。

2

Linux

export CLASSPATH = $ CLASSPATH:$ RX_JAVA/rxjava-2.2.4.jar:reactive-streams-1.0.2.jar :.

3

Mac

export CLASSPATH = $ CLASSPATH:$ RX_JAVA/rxjava-2.2.4.jar:reactive-streams-1.0.2.jar :.

ステップ6-RxJavaセットアップのテスト

以下に示すようにクラスTestRx.javaを作成します-

import io.reactivex.Flowable;
public class TestRx {
   public static void main(String[] args) {
      Flowable.just("Hello World!").subscribe(System.out::println);
   }
}

手順7-結果の確認

次のように javac コンパイラを使用してクラスをコンパイルします-

C:\RxJava>javac Tester.java

出力を確認します。

Hello World!

RxJava-Observableの仕組み

*Observables* は、* Observers(サブスクライバー)*がリッスンするデータのソースを表します。 一言で言えば、Observableはアイテムを発行し、Subscriberはこれらのアイテムを消費します。

観測可能

  • Observableは、サブスクライバがリスニングを開始するとデータを提供します。
  • Observableは任意の数のアイテムを放出できます。
  • Observableは、アイテムがなくても完了のシグナルのみを送信できます。
  • Observableは正常に終了できます。
  • Observableが終了することはありません。 e.g. ボタンは何度でもクリックできます。
  • Observableは、いつでもエラーをスローする場合があります。

加入者

  • Observableは複数のサブスクライバーを持つことができます。
  • Observableがアイテムを発行すると、各サブスクライバーのonNext()メソッドが呼び出されます。
  • Observableがアイテムの発行を完了すると、各サブスクライバーのonComplete()メソッドが呼び出されます。
  • Observableからエラーが発生した場合、各サブスクライバーのonError()メソッドが呼び出されます。

RxJava-オブザーバブルの作成

以下は、オブザーバブルを作成するための基本クラスです。

  • Flowable -0..Nフロー、0またはnアイテムを放出します。 リアクティブストリームとバックプレッシャーをサポートします。
  • Observable -0..Nのフローですが、背圧はありません。
  • Single -1アイテムまたはエラー。 メソッド呼び出しのリアクティブバージョンとして扱うことができます。
  • 完了-アイテムは放出されません。 完了またはエラーのシグナルとして使用されます。 Runnableのリアクティブバージョンとして扱うことができます。
  • MayBe -アイテムなし、または1アイテムが放出されました。 Optionalの事後対応バージョンとして扱うことができます。

以下は、Observableクラスでオブザーバブルを作成する便利なメソッドです。

  • * just(T item)*-指定された(定数参照)アイテムを通知するObservableを返し、完了します。
  • * fromIterable(Iterable source)*-Iterableシーケンスを、シーケンス内のアイテムを放出するObservableSourceに変換します。
  • * fromArray(T …​ items)*-配列内のアイテムを出力するObservableSourceに配列を変換します。
  • * fromCallable(Callable supplier)*-Observableを返します。オブザーバーがサブスクライブすると、指定した関数を呼び出し、その関数から返された値を発行します。
  • * fromFuture(Future future)*-FutureをObservableSourceに変換します。
  • * interval(long initialDelay、long period、TimeUnit unit)*-initialDelayの後に0Lを出力し、その後の各期間の後に数値を増加させるObservableを返します。

RxJava-単一の観測可能

Singleクラスは、単一値の応答を表します。 単一の監視可能オブジェクトは、単一の成功値またはエラーのいずれかのみを出力できます。 onCompleteイベントは発生しません。

クラス宣言

以下は、 io.reactivex.Single <T> クラスの宣言です-

public abstract class Single<T>
   extends Object
      implements SingleSource<T>

プロトコル

以下は、Single Observableが動作する順次プロトコルです-

onSubscribe (onSuccess | onError)?

単一の例

C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {
     //Create the observable
      Single<String> testSingle = Single.just("Hello World");

     //Create an observer
      Disposable disposable = testSingle
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(
         new DisposableSingleObserver<String>() {

         @Override
         public void onError(Throwable e) {
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }
      });
      Thread.sleep(3000);
     //start observing
      disposable.dispose();
   }
}

結果を確認する

次のように javac コンパイラを使用してクラスをコンパイルします-

C:\RxJava>javac ObservableTester.java

今ObservableTesterを次のように実行します-

C:\RxJava>java ObservableTester

それは次の出力を生成する必要があります-

Hello World

RxJava-MayBe Observable

MayBeクラスは、遅延応答を表します。 MayBe observableは、単一の成功値を出力することも、値を出力しないこともできます。

クラス宣言

以下は、 io.reactivex.Single <T> クラスの宣言です-

public abstract class Maybe<T>
   extends Object
      implements MaybeSource<T>

プロトコル

以下は、MayBe Observableが動作するシーケンシャルプロトコルです-

onSubscribe (onSuccess | onError | OnComplete)?

MayBeの例

C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Maybe;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
     //Create an observer
      Disposable disposable = Maybe.just("Hello World")
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(new DisposableMaybeObserver<String>() {
         @Override
         public void onError(Throwable e) {
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }

         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      });
      Thread.sleep(3000);
     //start observing
      disposable.dispose();
   }
}

結果を確認する

次のように javac コンパイラを使用してクラスをコンパイルします-

C:\RxJava>javac ObservableTester.java

今ObservableTesterを次のように実行します-

C:\RxJava>java ObservableTester

それは次の出力を生成する必要があります-

Hello World

RxJava-完全な観察可能

Completableクラスは、遅延応答を表します。 Completable observableは、正常終了またはエラーを示す場合があります。

クラス宣言

以下は、 io.reactivex.Completable クラスの宣言です-

public abstract class Completable
extends Object
implements CompletableSource

プロトコル

以下はCompletable Observableが動作する順次プロトコルです-

onSubscribe (onError | onComplete)?

完成可能な例

C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Completable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableCompletableObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {

     //Create an observer
      Disposable disposable = Completable.complete()
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(new DisposableCompletableObserver() {
         @Override
         public void onError(Throwable e) {
            e.printStackTrace();
         }
         @Override
         public void onStart() {
            System.out.println("Started!");
         }
         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      });
      Thread.sleep(3000);
     //start observing
      disposable.dispose();
   }
}

結果を確認する

次のように javac コンパイラを使用してクラスをコンパイルします-

C:\RxJava>javac ObservableTester.java

今ObservableTesterを次のように実行します-

C:\RxJava>java ObservableTester

それは次の出力を生成する必要があります-

Started!
Done!

RxJava-CompositeDisposableの使用

CompositeDisposableクラスは、複数の使い捨てを保持できるコンテナを表し、使い捨てを追加および削除するO(1)の複雑さを提供します。

クラス宣言

以下は、 io.reactivex.disposables.CompositeDisposable クラスの宣言です-

public final class CompositeDisposable
extends Object
implements Disposable, io.reactivex.internal.disposables.DisposableContainer

CompositeDisposableの例

C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。

ObservableTester.java

import io.reactivex.Maybe;
import io.reactivex.Single;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {
      CompositeDisposable compositeDisposable = new CompositeDisposable();

     //Create an Single observer
      Disposable disposableSingle = Single.just("Hello World")
      .delay(2, TimeUnit.SECONDS, Schedulers.io())
      .subscribeWith(
      new DisposableSingleObserver<String>() {
         @Override
         public void onError(Throwable e) {
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }
      });

     //Create an observer
      Disposable disposableMayBe = Maybe.just("Hi")
      .delay(2, TimeUnit.SECONDS, Schedulers.io())
      .subscribeWith(new DisposableMaybeObserver<String>() {
         @Override
         public void onError(Throwable e) {
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }

         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      });

      Thread.sleep(3000);

      compositeDisposable.add(disposableSingle);
      compositeDisposable.add(disposableMayBe);

     //start observing
      compositeDisposable.dispose();
   }
}

結果を確認する

次のように javac コンパイラを使用してクラスをコンパイルします-

C:\RxJava>javac ObservableTester.java

今ObservableTesterを次のように実行します-

C:\RxJava>java ObservableTester

それは次の出力を生成する必要があります-

Hello World
Hi

RxJava-オペレーターの作成

以下は、Observableを作成するために使用される演算子です。

Sr.No. Operator & Description
1

Create

Observableをゼロから作成し、オブザーバーメソッドがプログラムで呼び出すことを許可します。

2

Defer

オブザーバーがサブスクライブするまでObservableを作成しないでください。 各オブザーバーの新しいオブザーバブルを作成します。

3

Empty/Never/Throw

動作が制限されたObservableを作成します。

4

From

オブジェクト/データ構造をObservableに変換します。

5

Interval

指定された時間間隔のギャップで、整数を連続して放出するObservableを作成します。

6

Just

オブジェクトまたはデータ構造をObservableに変換して、同じまたは同じタイプのオブジェクトを発行します。

7

Range

指定された範囲の順序で整数を出力するObservableを作成します。

8

Repeat

整数を繰り返し放出するObservableを作成します。

9

Start

関数の戻り値を発行するObservableを作成します。

10

Timer

指定された遅延後に単一のアイテムを放出するObservableを作成します。

オペレーターの作成例

C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。

ObservableTester.java

import io.reactivex.Observable;
//Using fromArray operator to create an Observable
public class ObservableTester  {
   public static void main(String[] args) {
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .map(String::toUpperCase)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

結果を確認する

次のように javac コンパイラを使用してクラスをコンパイルします-

C:\RxJava>javac ObservableTester.java

今ObservableTesterを次のように実行します-

C:\RxJava>java ObservableTester

それは次の出力を生成する必要があります-

ABCDEFG

RxJava-変換演算子

以下は、Observableから放出されたアイテムを変換するために使用される演算子です。

Sr.No. Operator & Description
1

Buffer

Observableからアイテムを定期的にバンドルに収集し、アイテムではなくバンドルを発行します。

2

FlatMap

ネストされたオブザーバブルで使用されます。 アイテムをオブザーバブルに変換します。 次に、アイテムを単一のObservableにフラット化します。

3

GroupBy

Observableをキーごとに編成されたObservableのセットに分割し、アイテムの異なるグループを発行します。

4

Map

放出される各アイテムに関数を適用して変換します。

5

Scan

放出された各アイテムに関数を順番に適用し、連続した値を放出します。

6

Window

ObservableからObservableウィンドウにアイテムを定期的に収集し、アイテムではなくウィンドウを放出します。

変換演算子の例

C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。

ObservableTester.java

import io.reactivex.Observable;
//Using map operator to transform an Observable
public class ObservableTester  {
   public static void main(String[] args) {
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .map(String::toUpperCase)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

結果を確認する

次のように javac コンパイラを使用してクラスをコンパイルします-

C:\RxJava>javac ObservableTester.java

今ObservableTesterを次のように実行します-

C:\RxJava>java ObservableTester

それは次の出力を生成する必要があります-

ABCDEFG

RxJava-フィルタリング演算子

以下は、Observableから項目を選択的に送信するために使用される演算子です。

Sr.No. Operator & Description
1

Debounce

別のアイテムを発行せずにタイムアウトが発生した場合にのみ、アイテムを発行します。

2

Distinct

一意のアイテムのみを発行します。

3

ElementAt

Observableによって発行されたnインデックスのアイテムのみを発行します。

4

Filter

指定された述語関数を渡すアイテムのみを発行します。

5

First

指定された基準に合格した最初のアイテムまたは最初のアイテムを発行します。

6

IgnoreElements

Observableからアイテムを発行せず、完了をマークします。

7

Last

Observableから最後の要素を発行します。

8

Sample

指定された時間間隔で最新のアイテムを発行します。

9

Skip

Observableから最初のn個のアイテムをスキップします。

10

SkipLast

Observableの最後のn個のアイテムをスキップします。

11

Take

Observableから最初のn個のアイテムを取得します。

12

TakeLast

Observableから最後のn個のアイテムを取得します。

フィルタリング演算子の例

C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。

ObservableTester.java

import io.reactivex.Observable;
//Using take operator to filter an Observable
public class ObservableTester  {
   public static void main(String[] args) {
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .take(2)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

結果を確認する

次のように javac コンパイラを使用してクラスをコンパイルします-

C:\RxJava>javac ObservableTester.java

今ObservableTesterを次のように実行します-

C:\RxJava>java ObservableTester

それは次の出力を生成する必要があります-

ab

RxJava-演算子の組み合わせ

以下は、複数のObservableから単一のObservableを作成するために使用される演算子です。

Sr.No. Operator & Description
1

And/Then/When

パターンとプランの仲介を使用してアイテムセットを結合します。

2

CombineLatest

指定された関数を介して各Observableから放出された最新のアイテムを結合し、結果のアイテムを放出します。

3

Join

2番目のObservableから放出されたアイテムの時間枠中に放出された場合、2つのObservableから放出されたアイテムを結合します。

4

Merge

Observablesから放出されたアイテムを組み合わせます。

5

StartWith

ソースのObservableからアイテムの発行を開始する前に、指定されたアイテムのシーケンスを発行します

6

Switch

Observablesによって放出された最新のアイテムを放出します。

7

Zip

機能に基づいてObservablesのアイテムを結合し、結果のアイテムを出力します。

結合演算子の例

C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。

ObservableTester.java

import io.reactivex.Observable;
//Using combineLatest operator to combine Observables
public class ObservableTester {
   public static void main(String[] args) {
      Integer[] numbers = { 1, 2, 3, 4, 5, 6};
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable1 = Observable.fromArray(letters);
      Observable<Integer> observable2 = Observable.fromArray(numbers);
      Observable.combineLatest(observable1, observable2, (a,b) -> a + b)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

結果を確認する

次のように javac コンパイラを使用してクラスをコンパイルします-

C:\RxJava>javac ObservableTester.java

今ObservableTesterを次のように実行します-

C:\RxJava>java ObservableTester

それは次の出力を生成する必要があります-

g1g2g3g4g5g6

RxJava-ユーティリティオペレーター

以下は、Observableで役立つことが多い演算子です。

Sr.No. Operator & Description
1

Delay

Observableライフサイクルイベントを処理するアクションを登録します。

2

Materialize/Dematerialize

発行されたアイテムと送信された通知を表します。

3

ObserveOn

監視するスケジューラを指定します。

4

Serialize

Observableが強制的にシリアル化された呼び出しを行うようにします。

5

Subscribe

Observableからcompleteなどのアイテムと通知の放出を操作します

6

SubscribeOn

Observableがサブスクライブされるときに使用されるスケジューラを指定します。

7

TimeInterval

Observableを変換して、放出と放出の間の経過時間の指標を放出します。

8

Timeout

指定された時間が発生した場合、アイテムを出力しないでエラー通知を発行します。

9

Timestamp

放出される各アイテムにタイムスタンプを添付します。

9

Using

使い捨てリソースまたはObservableと同じ寿命を作成します。

ユーティリティオペレーターの例

C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。

ObservableTester.java

import io.reactivex.Observable;
//Using subscribe operator to subscribe to an Observable
public class ObservableTester  {
   public static void main(String[] args) {
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable.subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

結果を確認する

次のように javac コンパイラを使用してクラスをコンパイルします-

C:\RxJava>javac ObservableTester.java

今ObservableTesterを次のように実行します-

C:\RxJava>java ObservableTester

それは次の出力を生成する必要があります-

abcdefg

RxJava-条件演算子

以下は、1つまたは複数のObservableまたは放出されたアイテムを評価する演算子です。

Sr.No. Operator & Description
1

All

指定された基準を満たすために放出されたすべてのアイテムを評価します。

2

Amb

複数のObservableを指定した場合にのみ、最初のObservableからすべてのアイテムを発行します。

3

Contains

Observableが特定のアイテムを放出するかどうかを確認します。

4

DefaultIfEmpty

Observableが何も発行しない場合、デフォルトのアイテムを発行します。

5

SequenceEqual

2つのObservableが同じアイテムシーケンスを放出するかどうかをチェックします。

6

SkipUntil

2番目のObservableがアイテムを放出するまで、最初のObservableによって放出されたアイテムを破棄します。

7

SkipWhile

特定の条件がfalseになるまで、Observableによって発行されたアイテムを破棄します。

8

TakeUntil

2番目のObservableがアイテムを発行または終了した後、Observableが発行したアイテムを破棄します。

9

TakeWhile

指定された条件がfalseになった後、Observableによって発行されたアイテムを破棄します。

条件演算子の例

C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。

ObservableTester.java

import io.reactivex.Observable;
//Using defaultIfEmpty operator to operate on an Observable
public class ObservableTester  {
   public static void main(String[] args) {
      final StringBuilder result = new StringBuilder();
      Observable.empty()
      .defaultIfEmpty("No Data")
      .subscribe(s -> result.append(s));
      System.out.println(result);
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result1 = new StringBuilder();
      Observable.fromArray(letters)
      .firstElement()
      .defaultIfEmpty("No data")
      .subscribe(s -> result1.append(s));
      System.out.println(result1);
   }
}

結果を確認する

次のように javac コンパイラを使用してクラスをコンパイルします-

C:\RxJava>javac ObservableTester.java

今ObservableTesterを次のように実行します-

C:\RxJava>java ObservableTester

それは次の出力を生成する必要があります-

No Data
a

RxJava-数学演算子

以下は、Observableによって放出されるアイテム全体を操作する演算子です。

Sr.No. Operator & Description
1

Average

すべてのアイテムの平均を評価し、結果を出力します。

2

Concat

インターリーブせずに複数のObservableからすべてのアイテムを発行します。

3

Count

すべてのアイテムをカウントし、結果を出力します。

4

Max

すべてのアイテムの最大値アイテムを評価し、結果を出力します。

5

Min

すべてのアイテムの最小値アイテムを評価し、結果を出力します。

6

Reduce

各アイテムに関数を適用し、結果を返します。

7

Sum

すべてのアイテムの合計を評価し、結果を出力します。

数学演算子の例

C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。

ObservableTester.java

import io.reactivex.Observable;
//Using concat operator to operate on multiple Observables
public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {
      Integer[] numbers = { 1, 2, 3, 4, 5, 6};
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable1 = Observable.fromArray(letters);
      Observable<Integer> observable2 = Observable.fromArray(numbers);
      Observable.concat(observable1, observable2)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

結果を確認する

次のように javac コンパイラを使用してクラスをコンパイルします-

C:\RxJava>javac ObservableTester.java

今ObservableTesterを次のように実行します-

C:\RxJava>java ObservableTester

それは次の出力を生成する必要があります-

abcdefg123456

RxJava-接続可能なオペレーター

以下は、サブスクリプションをより正確に制御できるオペレーターです。

Sr.No. Operator & Description
1

Connect

接続可能なObservableに、サブスクライバーにアイテムを発行するように指示します。

2

Publish

Observableを接続可能なObservableに変換します。

3

RefCount

接続可能なObservableを通常のObservableに変換します。

4

Replay

Observableがアイテムの発行を開始し、サブスクライバーが後でサブスクライブした後でも、各サブスクライバーが同じ順序で発行されたアイテムを見るようにします。

接続可能なオペレーターの例

C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;
//Using connect operator on a ConnectableObservable
public class ObservableTester {
   public static void main(String[] args) {
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      ConnectableObservable<String> connectable = Observable.fromArray(letters).publish();
      connectable.subscribe(letter -> result.append(letter));
      System.out.println(result.length());
      connectable.connect();
      System.out.println(result.length());
      System.out.println(result);
   }
}

結果を確認する

次のように javac コンパイラを使用してクラスをコンパイルします-

C:\RxJava>javac ObservableTester.java

今ObservableTesterを次のように実行します-

C:\RxJava>java ObservableTester

それは次の出力を生成する必要があります-

0
7
abcdefg

RxJava-サブジェクト

Reactiveに従って、サブジェクトはオブザーバとしてだけでなく、オブザーバとしても機能できます。

'_サブジェクトは、オブザーバーとオブザーバブルの両方として機能するReactiveXの一部の実装で利用可能な一種のブリッジまたはプロキシです。 オブザーバーであるため、1つ以上のObservableにサブスクライブでき、Observableであるため、監視するアイテムを再送信することで通過でき、新しいアイテムを送信することもできます。_

主題の4つのタイプがあります-

Sr.No. Subject & Description
1

Publish Subject

サブスクリプションの時間後に放出されるアイテムのみを放出します。

2

Replay Subject

Observableをサブスクライブした時期に関係なく、ソースObservableによって発行されたすべてのアイテムを発行します。

3

Behavior Subject

サブスクリプション時に、最新のアイテムを発行し、ソースObservableによって発行されたアイテムを引き続き発行します。

4

Async Subject

放出が完了すると、ソースObservableによって放出された最後のアイテムを放出します。

RxJava-PublishSubject

PublishSubjectは、現在サブスクライブしているオブザーバーにアイテムを発行し、現在または後のオブザーバーに端末イベントを発行します。

クラス宣言

以下は、 io.reactivex.subjects.PublishSubject <T> クラスの宣言です-

public final class PublishSubject<T>
extends Subject<T>

PublishSubjectの例

C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。

ObservableTester.java

import io.reactivex.subjects.PublishSubject;
public class ObservableTester  {
   public static void main(String[] args) {
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();

      PublishSubject<String> subject = PublishSubject.create();
      subject.subscribe(value -> result1.append(value) );
      subject.onNext("a");
      subject.onNext("b");
      subject.onNext("c");
      subject.subscribe(value -> result2.append(value));
      subject.onNext("d");
      subject.onComplete();

     //Output will be abcd
      System.out.println(result1);
     //Output will be d only
     //as subscribed after c item emitted.
      System.out.println(result2);
   }
}

結果を確認する

次のように javac コンパイラを使用してクラスをコンパイルします-

C:\RxJava>javac ObservableTester.java

今ObservableTesterを次のように実行します-

C:\RxJava>java ObservableTester

それは次の出力を生成する必要があります-

abcd
d

RxJava-BehaviorSubject

BehaviorSubjectは、観察した最新のアイテムを発行し、その後、サブスクライブされた各オブザーバーにその後のすべての観察されたアイテムを発行します。

クラス宣言

以下は、 io.reactivex.subjects.BehaviorSubject <T> クラスの宣言です-

public final class BehaviorSubject<T>
extends Subject<T>

BehaviorSubjectの例

C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。

ObservableTester.java

import io.reactivex.subjects.BehaviorSubject;
public class ObservableTester  {
   public static void main(String[] args) {
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();
      BehaviorSubject<String> subject =  BehaviorSubject.create();
      subject.subscribe(value -> result1.append(value) );
      subject.onNext("a");
      subject.onNext("b");
      subject.onNext("c");
      subject.subscribe(value -> result2.append(value));
      subject.onNext("d");
      subject.onComplete();
     //Output will be abcd
      System.out.println(result1);
     //Output will be cd being BehaviorSubject
     //(c is last item emitted before subscribe)
      System.out.println(result2);
   }
}

結果を確認する

次のように javac コンパイラを使用してクラスをコンパイルします-

C:\RxJava>javac ObservableTester.java

今ObservableTesterを次のように実行します-

C:\RxJava>java ObservableTester

それは次の出力を生成する必要があります-

abcd
cd

RxJava-ReplaySubject

ReplaySubjectは、イベントおよびアイテムを現在および後のオブザーバーにリプレイします。

クラス宣言

以下は、 io.reactivex.subjects.ReplaySubject <T> クラスの宣言です-

public final class ReplaySubject<T>
extends Subject<T>

ReplaySubjectの例

C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。

ObservableTester.java

import io.reactivex.subjects.ReplaySubject;
public class ObservableTester  {
   public static void main(String[] args) {
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();

      ReplaySubject<String> subject = ReplaySubject.create();
      subject.subscribe(value -> result1.append(value) );
      subject.onNext("a");
      subject.onNext("b");
      subject.onNext("c");
      subject.subscribe(value -> result2.append(value));
      subject.onNext("d");
      subject.onComplete();

     //Output will be abcd
      System.out.println(result1);
     //Output will be abcd being ReplaySubject
     //as ReplaySubject emits all the items
      System.out.println(result2);
   }
}

結果を確認する

次のように javac コンパイラを使用してクラスをコンパイルします-

C:\RxJava>javac ObservableTester.java

今ObservableTesterを次のように実行します-

C:\RxJava>java ObservableTester

それは次の出力を生成する必要があります-

abcd
abcd

RxJava-AsyncSubject

AsyncSubjectは、完了イベントまたは受信したエラーが続く最後の値のみをObserversに送信します。

クラス宣言

以下は、 io.reactivex.subjects.AsyncSubject <T> クラスの宣言です-

public final class  AsyncSubject<T>
extends Subject<T>

AsyncSubjectの例

C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。

ObservableTester.java

import io.reactivex.subjects. AsyncSubject;
public class ObservableTester  {
   public static void main(String[] args) {
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();

      AsyncSubject<String> subject =  AsyncSubject.create();
      subject.subscribe(value -> result1.append(value) );
      subject.onNext("a");
      subject.onNext("b");
      subject.onNext("c");
      subject.subscribe(value -> result2.append(value));
      subject.onNext("d");
      subject.onComplete();

     //Output will be d being the last item emitted
      System.out.println(result1);
     //Output will be d being the last item emitted
      System.out.println(result2);
   }
}

結果を確認する

次のように javac コンパイラを使用してクラスをコンパイルします-

C:\RxJava>javac ObservableTester.java

今ObservableTesterを次のように実行します-

C:\RxJava>java ObservableTester

それは次の出力を生成する必要があります-

d
d

RxJava-スケジューラー

スケジューラーは、マルチスレッド環境で使用され、Observableオペレーターと連携します。

Reactiveに従って、Schedulerは、オペレーターのチェーンが異なるスレッドにどのように適用されるかをスケジュールするために使用されます。

デフォルトでは、Observableとそれに適用する一連の演算子は、Subscribeメソッドが呼び出された同じスレッドで作業を行い、オブザーバーに通知します。 SubscribeOnオペレーターは、Observableが動作する別のスケジューラーを指定することにより、この動作を変更します。 ObserveOnオペレーターは、Observableがオブザーバーに通知を送信するために使用する別のスケジューラーを指定します。

RxJavaで利用可能なスケジューラには次のタイプがあります-

Sr.No. Scheduler & Description
1

Schedulers.computation()

計算作業用のスケジューラを作成して返します。 スケジュールされるスレッドの数は、システムに存在するCPUに依存します。 CPUごとに1つのスレッドが許可されます。 イベントループまたはコールバック操作に最適です。

2

Schedulers.io()

IOにバインドされた作業用のスケジューラを作成して返します。 スレッドプールは必要に応じて拡張できます。

3

Schedulers.newThread()

作業単位ごとに新しいスレッドを作成するスケジューラを作成して返します。

4

Schedulers.trampoline()

現在の作業が完了した後に実行される現在のスレッドの作業をキューに入れるスケジューラを作成して返します。

4

Schedulers.from(java.util.concurrent.Executor executor)

エグゼキューターを新しいスケジューラーインスタンスに変換します。

RxJava-トランポリンスケジューラ

Schedulers.trampoline()メソッドは、現在の作業の完了後に実行される現在のスレッドの作業をキューに入れるスケジューラを作成して返します。

Schedulers.trampoline()の例

C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread "
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.trampoline()))
         .subscribe(length -> System.out.println("Receiver Thread "
            + Thread.currentThread().getName()
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

結果を確認する

次のように javac コンパイラを使用してクラスをコンパイルします-

C:\RxJava>javac ObservableTester.java

今ObservableTesterを次のように実行します-

C:\RxJava>java ObservableTester

それは次の出力を生成する必要があります-

Processing Thread main
Receiver Thread main, Item length 1
Processing Thread main
Receiver Thread main, Item length 2
Processing Thread main
Receiver Thread main, Item length 3

RxJava-NewThreadスケジューラー

Schedulers.newThread()メソッドは、作業単位ごとに新しいスレッドを作成するスケジューラを作成して返します。

Schedulers.newThread()の例

C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread "
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.newThread()))
         .subscribe(length -> System.out.println("Receiver Thread "
            + Thread.currentThread().getName()
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

結果を確認する

次のように javac コンパイラを使用してクラスをコンパイルします-

C:\RxJava>javac ObservableTester.java

今ObservableTesterを次のように実行します-

C:\RxJava>java ObservableTester

それは次の出力を生成する必要があります-

Processing Thread RxNewThreadScheduler-1
Receiver Thread RxNewThreadScheduler-1, Item length 1
Processing Thread RxNewThreadScheduler-2
Receiver Thread RxNewThreadScheduler-2, Item length 2
Processing Thread RxNewThreadScheduler-3
Receiver Thread RxNewThreadScheduler-3, Item length 3

RxJava-計算スケジューラ

Schedulers.computation()メソッドは、計算作業用のスケジューラを作成して返します。 スケジュールされるスレッドの数は、システムに存在するCPUに依存します。 CPUごとに1つのスレッドが許可されます。 イベントループまたはコールバック操作に最適です。

Schedulers.computation()の例

C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread "
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.computation()))
         .subscribe(length -> System.out.println("Receiver Thread "
            + Thread.currentThread().getName()
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

結果を確認する

次のように javac コンパイラを使用してクラスをコンパイルします-

C:\RxJava>javac ObservableTester.java

今ObservableTesterを次のように実行します-

C:\RxJava>java ObservableTester

それは次の出力を生成する必要があります-

Processing Thread RxComputationThreadPool-1
Receiver Thread RxComputationThreadPool-1, Item length 1
Processing Thread RxComputationThreadPool-2
Receiver Thread RxComputationThreadPool-2, Item length 2
Processing Thread RxComputationThreadPool-3
Receiver Thread RxComputationThreadPool-3, Item length 3

RxJava-IOスケジューラー

Schedulers.io()メソッドは、IOバウンド作業用のスケジューラを作成して返します。 スレッドプールは必要に応じて拡張できます。 I/O集中型の操作に最適です。

Schedulers.io()の例

C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread "
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.io()))
         .subscribe(length -> System.out.println("Receiver Thread "
            + Thread.currentThread().getName()
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

結果を確認する

次のように javac コンパイラを使用してクラスをコンパイルします-

C:\RxJava>javac ObservableTester.java

今ObservableTesterを次のように実行します-

C:\RxJava>java ObservableTester

それは次の出力を生成する必要があります-

Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 1
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 2
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 3

RxJava-スケジューラーから

Schedulers.from(Executor)メソッドは、Executorを新しいSchedulerインスタンスに変換します。

Schedulers.from(Executor)の例

C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。

ObservableTester.java

import java.util.Random;
import java.util.concurrent.Executors;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread "
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(3))))
         .subscribe(length -> System.out.println("Receiver Thread "
            + Thread.currentThread().getName()
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

結果を確認する

次のように javac コンパイラを使用してクラスをコンパイルします-

C:\RxJava>javac ObservableTester.java

今ObservableTesterを次のように実行します-

C:\RxJava>java ObservableTester

それは次の出力を生成する必要があります-

Processing Thread pool-1-thread-1
Processing Thread pool-3-thread-1
Receiver Thread pool-1-thread-1, Item length 1
Processing Thread pool-4-thread-1
Receiver Thread pool-4-thread-1, Item length 3
Receiver Thread pool-3-thread-1, Item length 2

RxJava-バッファリング

バッファリング演算子を使用すると、Observableによって発行されたアイテムをリストまたはバンドルに収集し、アイテムの代わりにそれらのバンドルを発行できます。 以下の例では、9つのアイテムを発行するObservableを作成し、バッファリングを使用して、3つのアイテムが一緒に発行されます。

バッファリングの例

C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

import java.util.List;
import java.util.concurrent.TimeUnit;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      Observable<Integer> observable = Observable.just(1, 2, 3, 4,
         5, 6, 7, 8, 9);

      observable.subscribeOn(Schedulers.io())
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .buffer(3)
         .subscribe(new Observer<List<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {
               System.out.println("Subscribed");
            }
            @Override
            public void onNext(List<Integer> integers) {
               System.out.println("onNext: ");
               for (Integer value : integers) {
                  System.out.println(value);
               }
            }
            @Override
            public void onError(Throwable e) {
               System.out.println("Error");
            }

            @Override
            public void onComplete() {
               System.out.println("Done! ");
            }
         });
      Thread.sleep(3000);
   }
}

結果を確認する

次のように javac コンパイラを使用してクラスをコンパイルします-

C:\RxJava>javac ObservableTester.java

今ObservableTesterを次のように実行します-

C:\RxJava>java ObservableTester

それは次の出力を生成する必要があります-

Subscribed
onNext:
1
2
3
onNext:
4
5
6
onNext:
7
8
9
Done!

RxJava-ウィンドウ

ウィンドウ演算子は、バッファー演算子と同様に機能しますが、Observableによって発行されたアイテムをコレクションの代わりに別のObservableに収集し、コレクションの代わりにそれらのObservableを発行できます。 以下の例では、9つのアイテムを放出するObservableを作成し、window演算子を使用して、3つのObservableを一緒に放出します。

ウィンドウ処理の例

C:\> RxJavaなどで選択したエディターを使用して、次のJavaプログラムを作成します。

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      Observable<Integer> observable = Observable.just(1, 2, 3, 4,
         5, 6, 7, 8, 9);

      observable.subscribeOn(Schedulers.io())
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .window(3)
         .subscribe(new Observer<Observable<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {
               System.out.println("Subscribed");
            }
            @Override
            public void onNext(Observable<Integer> integers) {
               System.out.println("onNext: ");
               integers.subscribe(value -> System.out.println(value));
            }
            @Override
            public void onError(Throwable e) {
               System.out.println("Error");
            }

            @Override
            public void onComplete() {
               System.out.println("Done! ");
            }
         });
      Thread.sleep(3000);
   }
}

結果を確認する

次のように javac コンパイラを使用してクラスをコンパイルします-

C:\RxJava>javac ObservableTester.java

今ObservableTesterを次のように実行します-

C:\RxJava>java ObservableTester

それは次の出力を生成する必要があります-

Subscribed
onNext:
1
2
3
onNext:
4
5
6
onNext:
7
8
9
Done!