BLOG

ブログ

【Angular】RxJSのObservableで非同期処理を実装してみた

こんにちわ。崩壊スターレイル(ゲーム)が熱い苅田です。

最近、RxJSのObservableを使用して開発する機会があり、初見の理解しにくさも相まって備忘のために記事を書くことにしました。
同じようにObservableの理解に苦戦している人の助けになれれば幸いです。

動作環境

  • M1 Mac(macOS Sonoma)
  • Angular:18.0.0
  • Node:22.2.0
  • RxJS:7.8.0

RxJSとは?

RxJS (Reactive Extensions for JavaScript) は、非同期処理、もしくはイベントベースの処理を容易に扱うためのライブラリです。
Observableというデータ構造を使い、イベントの発生やデータの変更を監視し、それに対しリアクティブに処理を行うことができます。

以下、Observableを使用した実装例となります。

import { Observable, Subscription } from 'rxjs';

export class TestComponent {
  // Subscriptionを入れる変数
  subscription?: Subscription;

  ngOnInit(): void {
    // Observableオブジェクトのコールバック処理を定義
    // subscribe(購読)を開始すると、コールバック内の処理が走る
    const observable$ = new Observable((subscriber) => {
      // subscriber(購読者)がnext()でObserverに値を流す
      subscriber.next(1);
      subscriber.next(2);
      subscriber.next(3);
      // subscriber(購読者)がcomplete()でObserverに処理完了を通知する
      subscriber.complete();
    });

    // Observerオブジェクト
    const observer = {
      // Observableから新しい値が流れてきた時に呼び出される
      next: (res: any) => console.log(res),
      // Observableからエラーが流れてきた時に呼び出される
      error: (err: Error) => console.error('エラー:', err),
      // Observableから明示的に処理の完了が通知されたときに呼び出される
      complete: () => console.log('完了'),
    };

    // subscribe(購読)したObservableの返り値が、Subscriptionに格納される
    this.subscription = observable$.subscribe(observer);
  }

  ngOnDestroy() {
    // 購読していたSubscriptionを解除して、購読を停止
    if (this.subscription) {
      this.subscription.unsubscribe();
    }
  }
}
// 出力: 1, 2, 3, 4, 5, 完了

Observableとは?

Observableは、イベントやデータをストリーム(流れ)として扱うオブジェクトです。
これは、時間の経過とともに発生する1つ以上の値(イベントやAPIレスポンスなど)が流れるオブジェクトなります。

Observableはsubscribeというメソッドを持ち、subscribeメソッドによって購読が開始され、購読されるとObservable内に定義された処理が開始されます。
subscribeメソッドは返り値としてSubscriptionオブジェクト(後述)を返します。

    // Observableオブジェクトのコールバック処理を定義
    // subscribe(購読)を開始すると、コールバック内の処理が走る
    const observable$ = new Observable((subscriber) => {
      // subscriber(購読者)がnext()でObserverに値を流す
      subscriber.next(1);
      subscriber.next(2);
      subscriber.next(3);
      // subscriber(購読者)がcomplete()でObserverに処理完了を通知する
      subscriber.complete();
    });

Observerとは?

Observerは、Observableからの通知を受け取るためのオブジェクトで、以下の3つのコールバックメソッドを持ちます。

  • next(value): Observableから新しい値が流れてくるたびに呼び出されます。引数のvalueはObservableから受け取った値となります。
  • error(err):Observableからエラーを受け取った際に呼び出されます。引数のerrはObservableから受け取ったエラーとなります。
    error()が呼び出されるとストリームはクローズとなり、ストリームの処理はそのまま実行されるもののnext()による値の通知はできなくなります。
  • complete() :Observableが全ての値の流し終えた際に呼び出されます。

Observableの中の処理が走り、next()やcomplete()が流れたときに、Observerのコールバックメソッドの処理が実行されます。

    // Observerオブジェクト
    const observer = {
      // Observableから新しい値が流れてきた時に呼び出される
      next: (res: any) => console.log(res),
      // Observableからエラーが流れてきた時に呼び出される
      error: (err: Error) => console.error('エラー:', err),
      // Observableから明示的に処理の完了が通知されたときに呼び出される
      complete: () => console.log('完了'),
    };

Subscriptionとは?

Subscriptionは、Observableのデータストリームに対する購読を管理するオブジェクトです。
上述した通り、Observableのsubscribeメソッドの返り値として返却されます。

Subscriptionは画面遷移しても残るため、不要になったSubscriptionは停止してあげる必要があります。
Subscription オブジェクトのunsubscribeメソッドを呼び出すことで、購読を停止することができます。(以下では、Angularコンポーネントが破棄されるタイミングであるOnDestroy()時にunsubscribeメソッドを呼び出しています)


    // subscribe(購読)したObservableの返り値が、Subscriptionに格納される
    this.subscription = observable$.subscribe(observer);
  }

  ngOnDestroy() {
    // 購読していたSubscriptionを解除して、購読を停止
    if (this.subscription) {
      this.subscription.unsubscribe();
    }
  }

処理の流れについておさらい

サンプルコードの場合を例に、以下のように処理が走ります。

  1. Observableオブジェクトを生成する
  2. Observerオブジェクトで購読開始時の処理を定義する
  3. Observableのsubscribe()で購読が開始され、Observable内でnextやcompleteが呼ばれるたび、Observer内のコールバックメソッドの処理が走る
  4. 購読が不要になったタイミングで、unsubscribe()で購読を停止する

上記のサンプルコードが正常に走った場合、コンソール上には以下のように表示されるはずです。

Operatorsについて

Operators(オペレーター)は、Observableのデータストリームを変換、フィルタリング、結合するための関数です。
Operatorsを使用することで、非同期データをより効率的に処理できます。
色んな関数が存在するのですが、主要と思われるOperatorsのみをいくつか紹介します。

of

ofオペレーターは、引数として与えられた値をそのまま発行するシンプルなObservableを作成します。
発行された値は先ほどのサンプルコード同様、Observerのnextで呼び出され、最終的にはcompletedが呼び出されます。

import { of } from 'rxjs';

const observable = of(1, 2, 3, 4, 5);

observable.subscribe({
  next: value => console.log(value),
  complete: () => console.log('Completed')
});
// 出力: 1, 2, 3, 4, 5, 完了

map

mapオペレーターは、Observableから発行される各値に対して関数内の処理を適用し、その結果を新しいObservableとして返します。

import { of } from 'rxjs';
import { map } from 'rxjs/operators';

const observable = of(1, 2, 3);
const mappedObservable = observable.pipe(map(x => x * 2));

mappedObservable.subscribe(value => console.log(value));
// 出力: 2, 4, 6

filter

filterオペレーターは、指定した条件に一致する値のみを通知し、それ以外の値をフィルタリングします。

import { of } from 'rxjs';
import { filter } from 'rxjs/operators';

const observable = of(1, 2, 3, 4, 5);
const filteredObservable = observable.pipe(filter(x => x % 2 === 0));

filteredObservable.subscribe(value => console.log(value));
// 出力: 2, 4

take

takeオペレーターは、Observableから最初のN個の値を取得し、それ以降は無視します。

import { interval } from 'rxjs';
import { take } from 'rxjs/operators';

const observable = interval(1000);
const limitedObservable = observable.pipe(take(3));

limitedObservable.subscribe(value => console.log(value));
// 出力: 0, 1, 2

さいごに

いかがだったでしょうか。

Observableについて公式ドキュメントを見てもなかなか理解できず、色んな記事を参考にしました。
今回はObservableのシンプルな動きの紹介でしたが、応用的な動きに関しても別記事で書けたらと思います。